# IBKR Analytics Database Analysis

This notebook connects to the same SQLite database used by the backend to enable interactive analysis of trading data.

## Database Overview

The database contains the following tables:
- **pnl_history**: Historical PnL records with net liquidation values
- **account_snapshots**: Account snapshots at different timestamps
- **positions**: Current position snapshots
- **trades**: Trade execution records
- **performance_metrics**: Calculated performance metrics

In [1]:
# Setup: Add project root to Python path
import sys
from pathlib import Path
import os

# Get the project root directory (parent of notebooks directory)
# In Jupyter notebooks, Path().resolve() gives the current working directory
# We need to go up one level from the notebooks directory
project_root = Path().resolve()
# If we're in the notebooks directory, go up one level
if project_root.name == 'notebooks':
    project_root = project_root.parent

project_root_str = str(project_root.resolve())

if project_root_str not in sys.path:
    sys.path.insert(0, project_root_str)

# Change working directory to project root for consistent path resolution
os.chdir(project_root_str)

print(f"Project root: {project_root}")
print(f"Working directory: {os.getcwd()}")
print(f"Python path includes project root: {project_root_str in sys.path}")

Project root: /Users/zelin/Desktop/PA Investment/Invest_strategy
Python path includes: False


In [2]:
# Import necessary libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sqlalchemy import create_engine, inspect, func, desc
from sqlalchemy.orm import sessionmaker, Session

# Import backend modules
from backend.config import settings
from backend.models import (
    PnLHistory, AccountSnapshot, Position, Trade, PerformanceMetric, Base
)

print("Libraries imported successfully")
print(f"Database URL: {settings.database.url}")

Libraries imported successfully
Database URL: sqlite:///./ibkr_analytics.db


## Database Connection

Connect to the same database used by the backend using the shared configuration.

In [3]:
# Create database engine using the same configuration as backend
# For SQLite, ensure relative paths are resolved from project root
db_url = settings.database.url
if db_url.startswith("sqlite"):
    # Convert relative SQLite paths to absolute paths based on project root
    if "./" in db_url or db_url.startswith("sqlite:///"):
        # Extract the database file path
        if db_url.startswith("sqlite:///./"):
            db_file = db_url.replace("sqlite:///./", "")
        elif db_url.startswith("sqlite:///"):
            db_file = db_url.replace("sqlite:///", "")
        else:
            db_file = db_url.split("///")[-1] if "///" in db_url else db_url.split("://")[-1]
        
        # Resolve to absolute path from project root
        db_path = (project_root / db_file).resolve()
        db_url = f"sqlite:///{db_path}"
        print(f"Resolved database path: {db_path}")
    
    engine = create_engine(
        db_url,
        connect_args={"check_same_thread": False},
        echo=settings.database.echo,
    )
else:
    engine = create_engine(
        db_url,
        echo=settings.database.echo,
    )

# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Test connection
with engine.connect() as conn:
    print("✓ Database connection successful!")
    print(f"✓ Database URL: {db_url}")
    
    # List all tables
    inspector = inspect(engine)
    tables = inspector.get_table_names()
    print(f"\n✓ Available tables: {', '.join(tables)}")

Resolved database path: /Users/zelin/Desktop/PA Investment/Invest_strategy/ibkr_analytics.db
✓ Database connection successful!
✓ Database URL: sqlite:////Users/zelin/Desktop/PA Investment/Invest_strategy/ibkr_analytics.db

✓ Available tables: account_snapshots, performance_metrics, pnl_history, positions, trades


## Helper Functions

Utility functions for common query patterns.

In [None]:
def query_to_df(query_result, model_class=None):
    """Convert SQLAlchemy query results to pandas DataFrame."""
    if isinstance(query_result, list):
        if len(query_result) == 0:
            return pd.DataFrame()
        # Convert list of model instances to dicts
        data = []
        for row in query_result:
            if hasattr(row, '__dict__'):
                row_dict = {k: v for k, v in row.__dict__.items() if not k.startswith('_')}
                data.append(row_dict)
            else:
                data.append(row)
        return pd.DataFrame(data)
    else:
        # Single result
        if hasattr(query_result, '__dict__'):
            return pd.DataFrame([{k: v for k, v in query_result.__dict__.items() if not k.startswith('_')}])
        return pd.DataFrame([query_result])

def get_session():
    """Get a database session."""
    return SessionLocal()

print("Helper functions defined")

## PnL History Analysis

Query the `pnl_history` table which contains net liquidation values and PnL breakdowns.

In [None]:
# Get all PnL history records
with get_session() as db:
    pnl_records = db.query(PnLHistory).order_by(PnLHistory.date).all()
    pnl_df = query_to_df(pnl_records)
    
print(f"Total PnL records: {len(pnl_df)}")
if not pnl_df.empty:
    print(f"\nDate range: {pnl_df['date'].min()} to {pnl_df['date'].max()}")
    print(f"\nFirst few records:")
    display(pnl_df.head())
    print(f"\nDataFrame info:")
    pnl_df.info()

In [None]:
# Get latest net liquidation value
with get_session() as db:
    latest_pnl = db.query(PnLHistory).order_by(desc(PnLHistory.date)).first()
    
if latest_pnl:
    print(f"Latest Net Liquidation: ${latest_pnl.net_liquidation:,.2f}")
    print(f"Date: {latest_pnl.date}")
    print(f"Total PnL: ${latest_pnl.total_pnl:,.2f}")
    print(f"Realized PnL: ${latest_pnl.realized_pnl:,.2f}")
    print(f"Unrealized PnL: ${latest_pnl.unrealized_pnl:,.2f}")
else:
    print("No PnL records found")

In [None]:
# Filter PnL by date range
start_date = datetime.now() - timedelta(days=30)  # Last 30 days
end_date = datetime.now()

with get_session() as db:
    recent_pnl = db.query(PnLHistory).filter(
        PnLHistory.date >= start_date,
        PnLHistory.date <= end_date
    ).order_by(PnLHistory.date).all()
    
    recent_pnl_df = query_to_df(recent_pnl)

if not recent_pnl_df.empty:
    print(f"Records in last 30 days: {len(recent_pnl_df)}")
    display(recent_pnl_df)
else:
    print("No records found in the specified date range")

In [None]:
# Calculate daily returns from net liquidation
if not pnl_df.empty and 'net_liquidation' in pnl_df.columns:
    pnl_df['date'] = pd.to_datetime(pnl_df['date'])
    pnl_df = pnl_df.sort_values('date')
    pnl_df['daily_return'] = pnl_df['net_liquidation'].pct_change()
    pnl_df['cumulative_return'] = (1 + pnl_df['daily_return']).cumprod() - 1
    
    print("Daily Returns Analysis:")
    print(f"Average daily return: {pnl_df['daily_return'].mean():.4%}")
    print(f"Std dev of daily returns: {pnl_df['daily_return'].std():.4%}")
    print(f"\nRecent returns:")
    display(pnl_df[['date', 'net_liquidation', 'daily_return', 'cumulative_return']].tail(10))

## Account Snapshots

Query account snapshots to see account values over time.

In [None]:
# Get all account snapshots
with get_session() as db:
    snapshots = db.query(AccountSnapshot).order_by(AccountSnapshot.timestamp).all()
    snapshots_df = query_to_df(snapshots)
    
print(f"Total account snapshots: {len(snapshots_df)}")
if not snapshots_df.empty:
    print(f"\nDate range: {snapshots_df['timestamp'].min()} to {snapshots_df['timestamp'].max()}")
    print(f"\nLatest snapshot:")
    display(snapshots_df.tail(1))
    print(f"\nAll columns:")
    print(snapshots_df.columns.tolist())

In [None]:
# Get latest account snapshot
with get_session() as db:
    latest_snapshot = db.query(AccountSnapshot).order_by(desc(AccountSnapshot.timestamp)).first()
    
if latest_snapshot:
    print("Latest Account Snapshot:")
    print(f"Account ID: {latest_snapshot.account_id}")
    print(f"Timestamp: {latest_snapshot.timestamp}")
    net_liq = latest_snapshot.net_liquidation
    print(f"Net Liquidation: ${net_liq:,.2f}" if net_liq is not None else "Net Liquidation: N/A")
    total_cash = latest_snapshot.total_cash_value
    print(f"Total Cash Value: ${total_cash:,.2f}" if total_cash is not None else "Total Cash Value: N/A")
    buying_power = latest_snapshot.buying_power
    print(f"Buying Power: ${buying_power:,.2f}" if buying_power is not None else "Buying Power: N/A")
    equity = latest_snapshot.equity
    print(f"Equity: ${equity:,.2f}" if equity is not None else "Equity: N/A")
else:
    print("No account snapshots found")

## Positions Analysis

Analyze current and historical positions.

In [None]:
# Get all positions
with get_session() as db:
    positions = db.query(Position).order_by(desc(Position.timestamp)).all()
    positions_df = query_to_df(positions)
    
print(f"Total position records: {len(positions_df)}")
if not positions_df.empty:
    print(f"\nUnique symbols: {positions_df['symbol'].nunique()}")
    print(f"\nLatest positions:")
    display(positions_df.head(10))

In [None]:
# Get latest position for each symbol
if not positions_df.empty:
    positions_df['timestamp'] = pd.to_datetime(positions_df['timestamp'])
    latest_positions = positions_df.sort_values('timestamp').groupby('symbol').tail(1)
    
    print("Latest Position for Each Symbol:")
    display(latest_positions[['symbol', 'quantity', 'avg_cost', 'market_price', 'market_value', 'unrealized_pnl', 'timestamp']])
    
    # Summary statistics
    print(f"\nTotal Market Value: ${latest_positions['market_value'].sum():,.2f}")
    print(f"Total Unrealized PnL: ${latest_positions['unrealized_pnl'].sum():,.2f}")

## Trades Analysis

Query trade execution history.

In [None]:
# Get all trades
with get_session() as db:
    trades = db.query(Trade).order_by(desc(Trade.exec_time)).all()
    trades_df = query_to_df(trades)
    
print(f"Total trades: {len(trades_df)}")
if not trades_df.empty:
    print(f"\nDate range: {trades_df['exec_time'].min()} to {trades_df['exec_time'].max()}")
    print(f"\nRecent trades:")
    display(trades_df.head(10))

In [None]:
# Trade volume analysis by symbol
if not trades_df.empty:
    trades_df['exec_time'] = pd.to_datetime(trades_df['exec_time'])
    
    # Group by symbol
    trade_summary = trades_df.groupby('symbol').agg({
        'shares': ['sum', 'count'],
        'price': 'mean',
        'commission': 'sum'
    }).round(2)
    
    trade_summary.columns = ['Total_Shares', 'Trade_Count', 'Avg_Price', 'Total_Commission']
    trade_summary = trade_summary.sort_values('Total_Shares', ascending=False)
    
    print("Trade Summary by Symbol:")
    display(trade_summary)
    # Trade count by side
    print("\nTrade Count by Side:")
    display(trades_df['side'].value_counts())

## Performance Metrics

Analyze calculated performance metrics.

In [None]:
# Get all performance metrics
with get_session() as db:
    metrics = db.query(PerformanceMetric).order_by(desc(PerformanceMetric.date)).all()
    metrics_df = query_to_df(metrics)
    
print(f"Total performance metric records: {len(metrics_df)}")
if not metrics_df.empty:
    print(f"\nDate range: {metrics_df['date'].min()} to {metrics_df['date'].max()}")
    print(f"\nLatest metrics:")
    display(metrics_df.head(10))

In [None]:
# Get latest performance metrics
if not metrics_df.empty:
    latest_metrics = metrics_df.iloc[0]  # Already sorted by date desc
    
    print("Latest Performance Metrics:")
    print(f"Date: {latest_metrics['date']}")
    print(f"Daily Return: {latest_metrics['daily_return']:.4%}" if pd.notna(latest_metrics['daily_return']) else "Daily Return: N/A")
    print(f"Cumulative Return: {latest_metrics['cumulative_return']:.4%}" if pd.notna(latest_metrics['cumulative_return']) else "Cumulative Return: N/A")
    print(f"Sharpe Ratio: {latest_metrics['sharpe_ratio']:.2f}" if pd.notna(latest_metrics['sharpe_ratio']) else "Sharpe Ratio: N/A")
    print(f"Sortino Ratio: {latest_metrics['sortino_ratio']:.2f}" if pd.notna(latest_metrics['sortino_ratio']) else "Sortino Ratio: N/A")
    print(f"Max Drawdown: {latest_metrics['max_drawdown']:.4%}" if pd.notna(latest_metrics['max_drawdown']) else "Max Drawdown: N/A")
    print(f"Win Rate: {latest_metrics['win_rate']:.2%}" if pd.notna(latest_metrics['win_rate']) else "Win Rate: N/A")
    print(f"Total Trades: {latest_metrics['total_trades']}")

## Custom Analysis

Use the cells below to write your own custom queries and analysis.

In [None]:
# Example: Query specific account_id
# account_id = "YOUR_ACCOUNT_ID"
# with get_session() as db:
#     account_pnl = db.query(PnLHistory).filter(
#         PnLHistory.account_id == account_id
#     ).order_by(PnLHistory.date).all()
#     account_pnl_df = query_to_df(account_pnl)
#     display(account_pnl_df)

In [None]:
# Example: Raw SQL query using pandas
# query = "SELECT * FROM pnl_history WHERE date >= '2024-01-01' ORDER BY date"
# df = pd.read_sql_query(query, engine)
# display(df)

In [None]:
# Example: Aggregate queries
# with get_session() as db:
#     result = db.query(
#         func.count(PnLHistory.id).label('count'),
#         func.avg(PnLHistory.net_liquidation).label('avg_net_liq'),
#         func.max(PnLHistory.net_liquidation).label('max_net_liq'),
#         func.min(PnLHistory.net_liquidation).label('min_net_liq')
#     ).first()
#     print(f"Count: {result.count}")
#     print(f"Avg Net Liquidation: ${result.avg_net_liq:,.2f}")
#     print(f"Max Net Liquidation: ${result.max_net_liq:,.2f}")
#     print(f"Min Net Liquidation: ${result.min_net_liq:,.2f}")

## Visualization Examples

Example code for creating visualizations (requires matplotlib or plotly).

In [None]:
# Example: Plot net liquidation over time
# import matplotlib.pyplot as plt
# 
# if not pnl_df.empty and 'net_liquidation' in pnl_df.columns:
#     pnl_df['date'] = pd.to_datetime(pnl_df['date'])
#     pnl_df = pnl_df.sort_values('date')
#     
#     plt.figure(figsize=(12, 6))
#     plt.plot(pnl_df['date'], pnl_df['net_liquidation'])
#     plt.title('Net Liquidation Over Time')
#     plt.xlabel('Date')
#     plt.ylabel('Net Liquidation ($)')
#     plt.grid(True)
#     plt.xticks(rotation=45)
#     plt.tight_layout()
#     plt.show()

In [None]:
# Example: Plot PnL breakdown
# import plotly.graph_objects as go
# 
# if not pnl_df.empty:
#     fig = go.Figure()
#     fig.add_trace(go.Scatter(x=pnl_df['date'], y=pnl_df['realized_pnl'], 
#                              name='Realized PnL', mode='lines'))
#     fig.add_trace(go.Scatter(x=pnl_df['date'], y=pnl_df['unrealized_pnl'], 
#                              name='Unrealized PnL', mode='lines'))
#     fig.add_trace(go.Scatter(x=pnl_df['date'], y=pnl_df['total_pnl'], 
#                              name='Total PnL', mode='lines'))
#     fig.update_layout(title='PnL Breakdown Over Time', 
#                       xaxis_title='Date', 
#                       yaxis_title='PnL ($)')
#     fig.show()