# Get all Historical Trades where DTE > 0

In [72]:
import pandas as pd
from sqlalchemy import create_engine, text
import os
from datetime import datetime, timedelta
import sys
import time

# Database connection setup
DB_CONFIG = {
    'dbname': 'defaultdb',
    'user': 'doadmin',
    'password': 'AVNS_SrG4Bo3B7uCNEPONkE4',
    'host': 'vvv-trading-db-do-user-2110609-0.i.db.ondigitalocean.com',
    'port': '25060'
}

DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}"
engine = create_engine(DATABASE_URL, connect_args={'sslmode': 'require'})

# Create data directory
os.makedirs('data', exist_ok=True)

# Generate filenames with current timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
darkpool_filename = f'data/darkpool_trades_historical_{timestamp}.csv'
options_filename = f'data/options_flow_historical_{timestamp}.csv'

print("Fetching ALL historical dark pool trades...")
print("This might take some time depending on the database size...")

# Modified query for dark pool trades - fetch all historical
darkpool_query = """
SELECT 
    t.*,
    date_trunc('hour', t.executed_at) as trade_hour,
    t.price - t.nbbo_bid as price_impact,
    CASE 
        WHEN t.nbbo_bid IS NULL OR t.nbbo_bid = 0 THEN NULL
        ELSE (t.price - t.nbbo_bid) / t.nbbo_bid
    END as price_impact_pct,
    CASE 
        WHEN t.size >= 10000 THEN 'Block Trade'
        WHEN t.premium >= 1000000 THEN 'High Premium'
        ELSE 'Regular'
    END as trade_type,
    count(*) over (partition by t.symbol, date_trunc('hour', t.executed_at)) as trades_per_hour,
    sum(t.size) over (partition by t.symbol, date_trunc('hour', t.executed_at)) as volume_per_hour
FROM trading.darkpool_trades t
ORDER BY t.executed_at DESC
"""

# Modified query for options flow - fetch all historical
options_query = """
SELECT 
    f.*,
    date_trunc('hour', f.collected_at) as flow_hour,
    CASE 
        WHEN f.premium >= 1000000 THEN 'Whale'
        WHEN f.premium >= 100000 THEN 'Large'
        ELSE 'Regular'
    END as flow_size,
    count(*) over (partition by f.symbol, date_trunc('hour', f.collected_at)) as flows_per_hour,
    sum(f.premium) over (partition by f.symbol, date_trunc('hour', f.collected_at)) as premium_per_hour,
    sum(f.contract_size) over (partition by f.symbol, date_trunc('hour', f.collected_at)) as contracts_per_hour
FROM trading.options_flow f
ORDER BY f.collected_at DESC
"""

def fetch_in_chunks(query, engine, filename, chunk_size=10000):
    """Fetch data in chunks to avoid memory issues with large datasets."""
    start_time = time.time()
    connection = engine.connect().execution_options(stream_results=True)
    chunks = []
    
    try:
        print(f"Executing query...")
        result = connection.execute(text(query))
        
        total_rows = 0
        chunk_num = 0
        
        while True:
            try:
                chunk = result.fetchmany(chunk_size)
                if not chunk:
                    break
                    
                chunk_df = pd.DataFrame(chunk, columns=result.keys())
                chunks.append(chunk_df)
                total_rows += len(chunk_df)
                chunk_num += 1
                
                elapsed = time.time() - start_time
                rows_per_sec = total_rows / elapsed if elapsed > 0 else 0
                
                print(f"Fetched {total_rows} rows so far... ({rows_per_sec:.2f} rows/sec, chunk {chunk_num})")
                
                # Write chunk to disk to avoid memory issues
                if len(chunks) >= 5:  # After accumulating 5 chunks
                    print(f"Writing chunks to {filename}...")
                    combined = pd.concat(chunks, ignore_index=True)
                    if not os.path.exists(filename):
                        combined.to_csv(filename, index=False)
                    else:
                        combined.to_csv(filename, mode='a', header=False, index=False)
                    chunks = []  # Clear the chunks from memory
                    print(f"Wrote {len(combined)} rows to file, continuing fetch...")
            except Exception as e:
                print(f"Error processing chunk {chunk_num}: {str(e)}")
                continue
                
        # Process any remaining chunks
        if chunks:
            print(f"Writing final chunks to {filename}...")
            combined = pd.concat(chunks, ignore_index=True)
            if not os.path.exists(filename):
                combined.to_csv(filename, index=False)
            else:
                combined.to_csv(filename, mode='a', header=False, index=False)
                
        total_time = time.time() - start_time
        print(f"Fetch completed in {total_time:.2f} seconds")
        return total_rows
        
    except Exception as e:
        print(f"Error in fetch_in_chunks: {str(e)}")
        return 0
    finally:
        connection.close()

# Fetch and save darkpool trades in chunks
print("\nFetching and saving darkpool trades...")
total_darkpool_rows = fetch_in_chunks(darkpool_query, engine, darkpool_filename)
print(f"Completed saving {total_darkpool_rows} darkpool trades to {darkpool_filename}")

# Fetch and save options flow data in chunks
print("\nFetching and saving options flow data...")
total_options_rows = fetch_in_chunks(options_query, engine, options_filename)
print(f"Completed saving {total_options_rows} option flows to {options_filename}")

print("\nFull data fetch complete.")
print("\nGenerating summary statistics...")

# Process and summarize data from the saved files
try:
    # Load a sample of the data to generate summary statistics
    trades_sample = pd.read_csv(darkpool_filename, nrows=100000)
    options_sample = pd.read_csv(options_filename, nrows=100000)
    
    # Process darkpool trades
    trades_sample['executed_at'] = pd.to_datetime(trades_sample['executed_at'])
    if 'collection_time' in trades_sample.columns:
        trades_sample['collection_time'] = pd.to_datetime(trades_sample['collection_time'])
    trades_sample['trade_hour'] = pd.to_datetime(trades_sample['trade_hour'])
    
    # Process options flow
    options_sample['collected_at'] = pd.to_datetime(options_sample['collected_at'])
    if 'created_at' in options_sample.columns:
        options_sample['created_at'] = pd.to_datetime(options_sample['created_at'])
    if 'expiry' in options_sample.columns:
        options_sample['expiry'] = pd.to_datetime(options_sample['expiry'])
    options_sample['flow_hour'] = pd.to_datetime(options_sample['flow_hour'])
    
    # Print darkpool trade summary (from sample)
    print("\nDarkpool Trade sample summary by symbol (first 100k rows):")
    print(trades_sample.groupby('symbol').agg({
        'size': ['count', 'sum', 'mean'],
        'premium': ['mean', 'max'],
        'price_impact_pct': 'mean'
    }).round(2))
    
    # Print options flow summary (from sample)
    print("\nOptions Flow sample summary by symbol (first 100k rows):")
    print(options_sample.groupby('symbol').agg({
        'premium': ['count', 'sum', 'mean', 'max'],
        'contract_size': ['sum', 'mean'],
        'iv_rank': 'mean'
    }).round(2))
    
    # Print date ranges for samples
    print("\nDate ranges (from samples):")
    print("Darkpool Trades:")
    print(f"Earliest trade in sample: {trades_sample['executed_at'].min()}")
    print(f"Latest trade in sample: {trades_sample['executed_at'].max()}")
    print(f"Total trades fetched: {total_darkpool_rows}")
    
    print("\nOptions Flow:")
    print(f"Earliest flow in sample: {options_sample['collected_at'].min()}")
    print(f"Latest flow in sample: {options_sample['collected_at'].max()}")
    print(f"Total flows fetched: {total_options_rows}")
    
except Exception as e:
    print(f"Error generating summary statistics: {str(e)}")
    print("Data has been saved to files, but summary statistics could not be generated.")

print("\nFull data is available in:")
print(f"- {darkpool_filename}")
print(f"- {options_filename}")

Fetching ALL historical dark pool trades...
This might take some time depending on the database size...

Fetching and saving darkpool trades...
Executing query...
Fetched 10000 rows so far... (5023.94 rows/sec, chunk 1)
Fetched 20000 rows so far... (8349.21 rows/sec, chunk 2)
Fetched 30000 rows so far... (11016.54 rows/sec, chunk 3)
Fetched 40000 rows so far... (13205.39 rows/sec, chunk 4)
Fetched 46704 rows so far... (14101.23 rows/sec, chunk 5)
Writing chunks to data/darkpool_trades_historical_20250509_160137.csv...


  combined = pd.concat(chunks, ignore_index=True)


Wrote 46704 rows to file, continuing fetch...
Fetch completed in 4.64 seconds
Completed saving 46704 darkpool trades to data/darkpool_trades_historical_20250509_160137.csv

Fetching and saving options flow data...
Executing query...
Fetch completed in 0.25 seconds
Completed saving 0 option flows to data/options_flow_historical_20250509_160137.csv

Full data fetch complete.

Generating summary statistics...
Error generating summary statistics: [Errno 2] No such file or directory: 'data/options_flow_historical_20250509_160137.csv'
Data has been saved to files, but summary statistics could not be generated.

Full data is available in:
- data/darkpool_trades_historical_20250509_160137.csv
- data/options_flow_historical_20250509_160137.csv


  trades_sample = pd.read_csv(darkpool_filename, nrows=100000)


# Get Trades for Last 24 Hours

In [2]:
pip install pandas

Collecting pandas
  Using cached pandas-2.2.3-cp313-cp313-macosx_11_0_arm64.whl.metadata (89 kB)
Collecting numpy>=1.26.0 (from pandas)
  Using cached numpy-2.2.5-cp313-cp313-macosx_11_0_arm64.whl.metadata (116 kB)
Using cached pandas-2.2.3-cp313-cp313-macosx_11_0_arm64.whl (11.3 MB)
Using cached numpy-2.2.5-cp313-cp313-macosx_11_0_arm64.whl (14.2 MB)
Installing collected packages: numpy, pandas
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [pandas]━━━━[0m [32m1/2[0m [pandas]
[1A[2KSuccessfully installed numpy-2.2.5 pandas-2.2.3
Note: you may need to restart the kernel to use updated packages.


In [80]:
import pandas as pd
from sqlalchemy import create_engine
import os
from datetime import datetime, timedelta

# Database connection setup (same as before)
DB_CONFIG = {
    'dbname': 'defaultdb',
    'user': 'doadmin',
    'password': 'AVNS_SrG4Bo3B7uCNEPONkE4',
    'host': 'vvv-trading-db-do-user-2110609-0.i.db.ondigitalocean.com',
    'port': '25060'
}

DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}"
engine = create_engine(DATABASE_URL, connect_args={'sslmode': 'require'})

# Calculate timestamp for 24 hours ago
twenty_four_hours_ago = datetime.now() - timedelta(hours=24)

# List of symbols to include
SYMBOLS = ["SPY", "QQQ", "GLD"]

# Query for dark pool trades from last 24 hours for selected symbols
darkpool_query = """
SELECT 
    t.*,
    date_trunc('hour', t.executed_at) as trade_hour,
    t.price - t.nbbo_bid as price_impact,
    CASE 
        WHEN t.nbbo_bid != 0 THEN (t.price - t.nbbo_bid) / t.nbbo_bid
        ELSE NULL
    END as price_impact_pct,
    CASE 
        WHEN t.size >= 10000 THEN 'Block Trade'
        WHEN t.premium >= 0.02 THEN 'High Premium'
        ELSE 'Regular'
    END as trade_type,
    count(*) over (partition by t.symbol, date_trunc('hour', t.executed_at)) as trades_per_hour,
    sum(t.size) over (partition by t.symbol, date_trunc('hour', t.executed_at)) as volume_per_hour
FROM trading.darkpool_trades t
WHERE t.executed_at >= %(twenty_four_hours_ago)s
  AND t.symbol = ANY(%(symbols)s)
ORDER BY t.executed_at DESC
"""

# Query for options flow from last 24 hours for selected symbols
options_query = """
SELECT 
    f.*,
    date_trunc('hour', f.collected_at) as flow_hour,
    CASE 
        WHEN f.premium >= 1000000 THEN 'Whale'
        WHEN f.premium >= 100000 THEN 'Large'
        ELSE 'Regular'
    END as flow_size,
    count(*) over (partition by f.symbol, date_trunc('hour', f.collected_at)) as flows_per_hour,
    sum(f.premium) over (partition by f.symbol, date_trunc('hour', f.collected_at)) as premium_per_hour,
    sum(f.contract_size) over (partition by f.symbol, date_trunc('hour', f.collected_at)) as contracts_per_hour
FROM trading.options_flow f
WHERE f.collected_at >= %(twenty_four_hours_ago)s
  AND f.symbol = ANY(%(symbols)s)
ORDER BY f.collected_at DESC
"""

# Fetch both datasets with the time parameter and symbols
print("Fetching dark pool trades from last 24 hours...")
trades_df = pd.read_sql_query(
    darkpool_query, engine, params={'twenty_four_hours_ago': twenty_four_hours_ago, 'symbols': SYMBOLS}
)

print("Fetching options flow data from last 24 hours...")
options_df = pd.read_sql_query(
    options_query, engine, params={'twenty_four_hours_ago': twenty_four_hours_ago, 'symbols': SYMBOLS}
)

# Process darkpool trades
trades_df['executed_at'] = pd.to_datetime(trades_df['executed_at'])
trades_df['collection_time'] = pd.to_datetime(trades_df['collection_time'])
trades_df['trade_hour'] = pd.to_datetime(trades_df['trade_hour'])

# Process options flow
options_df['collected_at'] = pd.to_datetime(options_df['collected_at'])
options_df['created_at'] = pd.to_datetime(options_df['created_at'])
options_df['expiry'] = pd.to_datetime(options_df['expiry'])
options_df['flow_hour'] = pd.to_datetime(options_df['flow_hour'])

# Create data directory
os.makedirs('data', exist_ok=True)

# Generate filenames with current timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
darkpool_filename = f'data/darkpool_trades_24h_{timestamp}.csv'
options_filename = f'data/options_flow_24h_{timestamp}.csv'

# Save both datasets
trades_df.to_csv(darkpool_filename, index=False)
options_df.to_csv(options_filename, index=False)

print(f"\nSaved {len(trades_df)} trades to {darkpool_filename}")
print(f"Saved {len(options_df)} option flows to {options_filename}")

# Print darkpool trade summary
print("\nDarkpool Trade summary by symbol:")
print(trades_df.groupby('symbol').agg({
    'size': ['count', 'sum', 'mean'],
    'premium': ['mean', 'max'],
    'price_impact_pct': 'mean'
}).round(2))

# Print options flow summary
print("\nOptions Flow summary by symbol:")
print(options_df.groupby('symbol').agg({
    'premium': ['count', 'sum', 'mean', 'max'],
    'contract_size': ['sum', 'mean'],
    'iv_rank': 'mean'
}).round(2))

# Print date ranges for both datasets
print("\nDate ranges:")
print("Darkpool Trades:")
print(f"Earliest trade: {trades_df['executed_at'].min()}")
print(f"Latest trade: {trades_df['executed_at'].max()}")
print(f"Total trades: {len(trades_df)}")
print(f"Total volume: {trades_df['size'].sum():,.0f}")

print("\nOptions Flow:")
print(f"Earliest flow: {options_df['collected_at'].min()}")
print(f"Latest flow: {options_df['collected_at'].max()}")
print(f"Total flows: {len(options_df)}")
print(f"Total premium: ${options_df['premium'].sum():,.2f}")

Fetching dark pool trades from last 24 hours...
Fetching options flow data from last 24 hours...

Saved 8519 trades to data/darkpool_trades_24h_20250509_215337.csv
Saved 0 option flows to data/options_flow_24h_20250509_215337.csv

Darkpool Trade summary by symbol:
        size                         premium             price_impact_pct
       count         sum     mean       mean         max             mean
symbol                                                                   
QQQ      819   3819097.0  4663.12  311620.82  27980000.0              0.0
SPY     7700  35682868.0  4634.14  318501.56  59680000.0              0.0

Options Flow summary by symbol:
Empty DataFrame
Columns: [(premium, count), (premium, sum), (premium, mean), (premium, max), (contract_size, sum), (contract_size, mean), (iv_rank, mean)]
Index: []

Date ranges:
Darkpool Trades:
Earliest trade: 2025-05-09 13:32:56+00:00
Latest trade: 2025-05-09 19:53:00+00:00
Total trades: 8519
Total volume: 39,501,965

Options F