# DefInd to PnL Full Pipeline

This notebook demonstrates the complete pipeline from DefInd indexed data to PnL calculations.

In [1]:
# Setup and imports
import sys
sys.path.append('../src')
sys.path.append('../../src')  # For defind

import pandas as pd
import numpy as np
import duckdb
from pathlib import Path
from datetime import datetime, timezone

# DefInd imports
from defind.storage.manifest import LiveManifest
from defind.storage.shards import ShardAggregator

# Define imports
from define.config import load_settings
from define.adapters import DuckDBRepository
from define.application import PNLEngine, to_daily
from define.domain import Pool, Token, PositionStatic, PositionEventState

print("‚úÖ All imports successful!")

ImportError: cannot import name 'ShardAggregator' from 'defind.storage.shards' (/home/youssef/define/src/defind/storage/shards.py)

In [3]:
# Step 1: Explore DefInd Data
print("üîç Exploring DefInd Data")
print("=" * 40)

# Look for DefInd data files
data_paths = [
    Path("../data"),
    Path("../../data"),
    Path("../src/defind/data/shards"),
    Path("../../src/defind/data/shards")
]

found_data = []
for path in data_paths:
    if path.exists():
        parquet_files = list(path.glob("*.parquet"))
        if parquet_files:
            found_data.extend(parquet_files)
            print(f"Found {len(parquet_files)} parquet files in {path}")

if found_data:
    print(f"\nTotal parquet files found: {len(found_data)}")
    for f in found_data[:5]:  # Show first 5
        print(f"  - {f.name} ({f.stat().st_size / 1024:.1f} KB)")
else:
    print("‚ö†Ô∏è No DefInd parquet files found. You may need to run DefInd first:")
    print("   defind fetch-logs --registry nfpm --start-block 18000000 --end-block 18001000")

üîç Exploring DefInd Data
‚ö†Ô∏è No DefInd parquet files found. You may need to run DefInd first:
   defind fetch-logs --registry nfpm --start-block 18000000 --end-block 18001000


In [None]:
# Step 2: Load and Examine DefInd Data
if found_data:
    print("\nüìä Loading DefInd Data")
    print("=" * 40)
    
    # Load the first parquet file to examine structure
    sample_file = found_data[0]
    df = pd.read_parquet(sample_file)
    
    print(f"Loaded {len(df)} rows from {sample_file.name}")
    print(f"Columns: {list(df.columns)}")
    print(f"\nSample data:")
    print(df.head())
    
    # Check for NFPM events
    if 'event_name' in df.columns:
        event_counts = df['event_name'].value_counts()
        print(f"\nEvent types found:")
        print(event_counts)
        
        # Focus on NFPM events for position tracking
        nfpm_events = ['Mint', 'Burn', 'IncreaseLiquidity', 'DecreaseLiquidity', 'Collect', 'Transfer']
        relevant_events = df[df['event_name'].isin(nfpm_events)] if 'event_name' in df.columns else df
        print(f"\nRelevant NFPM events: {len(relevant_events)}")
    else:
        relevant_events = df
        print("\nNo event_name column found, using all data")
else:
    print("\n‚ö†Ô∏è Skipping data loading - no files found")
    relevant_events = pd.DataFrame()

In [None]:
# Step 3: Create DuckDB Database from DefInd Data
if not relevant_events.empty:
    print("\nüóÑÔ∏è Creating DuckDB Database")
    print("=" * 40)
    
    # Create DuckDB database
    db_path = "defind_pnl_test.duckdb"
    con = duckdb.connect(db_path)
    
    # Load all parquet files into DuckDB
    print("Loading parquet files into DuckDB...")
    
    all_data = []
    for file in found_data:
        df_temp = pd.read_parquet(file)
        all_data.append(df_temp)
    
    combined_df = pd.concat(all_data, ignore_index=True)
    print(f"Combined data: {len(combined_df)} rows")
    
    # Register with DuckDB
    con.register("raw_events", combined_df)
    
    # Create the tables expected by our PnL engine
    print("Creating structured tables...")
    
    # This is a simplified transformation - you'll need to adapt based on your actual DefInd schema
    con.execute("""
        CREATE OR REPLACE TABLE pool_snapshots AS
        SELECT 
            address as pool_id,
            block_number,
            block_timestamp as ts,
            CAST(1000000000000000000000000 AS BIGINT) as sqrtPriceX96,  -- Mock value
            0 as tick,  -- Mock value
            CAST(0 AS BIGINT) as fg0_x128,  -- Mock value
            CAST(0 AS BIGINT) as fg1_x128,  -- Mock value
            CAST(0 AS BIGINT) as lower_fg0_out_x128,  -- Mock value
            CAST(0 AS BIGINT) as lower_fg1_out_x128,  -- Mock value
            CAST(0 AS BIGINT) as upper_fg0_out_x128,  -- Mock value
            CAST(0 AS BIGINT) as upper_fg1_out_x128   -- Mock value
        FROM raw_events 
        GROUP BY address, block_number, block_timestamp
    """)
    
    con.execute("""
        CREATE OR REPLACE TABLE pool_prices AS
        SELECT 
            address as pool_id,
            block_timestamp as ts,
            1.0 as token0_usd,    -- Mock USDC price
            2000.0 as token1_usd  -- Mock WETH price
        FROM raw_events 
        GROUP BY address, block_timestamp
    """)
    
    print("‚úÖ DuckDB tables created")
    
    # Show table sizes
    snapshots_count = con.execute("SELECT COUNT(*) FROM pool_snapshots").fetchone()[0]
    prices_count = con.execute("SELECT COUNT(*) FROM pool_prices").fetchone()[0]
    print(f"Pool snapshots: {snapshots_count}")
    print(f"Pool prices: {prices_count}")
    
    con.close()
else:
    print("\n‚ö†Ô∏è Skipping database creation - no data available")

In [None]:
# Step 4: Test PnL Engine with Real Data Structure
if not relevant_events.empty:
    print("\nüöÄ Testing PnL Engine with DefInd Data")
    print("=" * 40)
    
    # Load settings and create repository
    settings = load_settings(duckdb_path="defind_pnl_test.duckdb")
    tables = {
        "pool_snapshots": "pool_snapshots",
        "pool_prices": "pool_prices",
        "position_lifecycle_active": "position_lifecycle_active",  # We'll create this
        "position_lifecycle_closed": "position_lifecycle_closed",
        "position_state_last": "position_state_last"
    }
    
    # Create mock position tables for testing
    con = duckdb.connect("defind_pnl_test.duckdb")
    
    # Create mock active positions
    con.execute("""
        CREATE OR REPLACE TABLE position_lifecycle_active AS
        SELECT 
            12345 as token_id,
            (SELECT pool_id FROM pool_snapshots LIMIT 1) as pool_id,
            -1000 as tick_lower,
            1000 as tick_upper,
            CURRENT_TIMESTAMP as open_time,
            NULL as close_time,
            0 as open_tick,
            NULL as close_tick,
            1000.0 as entry_amount0,
            0.5 as entry_amount1,
            1000000 as remaining_liquidity
    """)
    
    con.execute("""
        CREATE OR REPLACE TABLE position_lifecycle_closed AS
        SELECT * FROM position_lifecycle_active WHERE 1=0  -- Empty table
    """)
    
    con.execute("""
        CREATE OR REPLACE TABLE position_state_last AS
        SELECT 
            12345 as token_id,
            1000000 as liquidity,
            CAST(100000000000000000000000000000000000000 AS BIGINT) as fee_growth_inside0_last_x128,
            CAST(50000000000000000000000000000000000000 AS BIGINT) as fee_growth_inside1_last_x128,
            10 as tokens_owed0,
            5 as tokens_owed1
    """)
    
    con.close()
    
    print("‚úÖ Mock position tables created")
else:
    print("\n‚ö†Ô∏è Skipping PnL engine test - no data available")

In [None]:
# Step 5: Run Full Pipeline
if not relevant_events.empty:
    print("\nüîÑ Running Full DefInd ‚Üí PnL Pipeline")
    print("=" * 40)
    
    try:
        # Create repository and engine
        repo = DuckDBRepository("defind_pnl_test.duckdb", tables=tables)
        engine = PNLEngine(pools=repo, positions=repo, prices=repo, rewards=repo, storage=repo)
        
        # Get a pool ID from our data
        con = duckdb.connect("defind_pnl_test.duckdb")
        pool_id = con.execute("SELECT pool_id FROM pool_snapshots LIMIT 1").fetchone()[0]
        con.close()
        
        # Create pool object
        pool = Pool(
            id=pool_id,
            token0=Token("0xa0b86a33e6441e6c7d3d0b4f4b1b8b8b8b8b8b8b", "USDC", 6),
            token1=Token("0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", "WETH", 18),
            fee=500
        )
        
        # Get time range from data
        con = duckdb.connect("defind_pnl_test.duckdb")
        time_range = con.execute("SELECT MIN(ts), MAX(ts) FROM pool_snapshots").fetchone()
        start_ts, end_ts = int(time_range[0]), int(time_range[1])
        con.close()
        
        print(f"Computing PnL for pool {pool_id}")
        print(f"Time range: {datetime.fromtimestamp(start_ts)} to {datetime.fromtimestamp(end_ts)}")
        
        # Compute intraday PnL
        intraday_df = engine.compute_intraday(pool, start_ts=start_ts, end_ts=end_ts)
        
        print(f"\n‚úÖ Computed {len(intraday_df)} intraday PnL rows")
        
        if not intraday_df.empty:
            print("\nIntraday PnL columns:")
            print(list(intraday_df.columns))
            
            print("\nSample intraday data:")
            print(intraday_df[['token_id', 'timestamp', 'value_total_usd', 'il_usd']].head())
            
            # Aggregate to daily
            daily_df = to_daily(intraday_df)
            print(f"\n‚úÖ Aggregated to {len(daily_df)} daily rows")
            
            if not daily_df.empty:
                print("\nDaily aggregated data:")
                print(daily_df)
        else:
            print("‚ö†Ô∏è No intraday data generated - check position/snapshot alignment")
            
    except Exception as e:
        print(f"‚ùå Error running pipeline: {e}")
        import traceback
        traceback.print_exc()
else:
    print("\n‚ö†Ô∏è Cannot run pipeline - no DefInd data found")
    print("\nTo get DefInd data, run:")
    print("  cd ../src/defind")
    print("  defind fetch-logs --registry nfpm --start-block 18000000 --end-block 18001000")

In [None]:
# Step 6: Analysis and Visualization
print("\nüìà Analysis Summary")
print("=" * 40)

if 'intraday_df' in locals() and not intraday_df.empty:
    print("\nüéâ Full Pipeline Success!")
    print("\nPipeline Summary:")
    print(f"  ‚Ä¢ DefInd data files processed: {len(found_data) if found_data else 0}")
    print(f"  ‚Ä¢ Total events loaded: {len(combined_df) if 'combined_df' in locals() else 0}")
    print(f"  ‚Ä¢ Pool snapshots created: {snapshots_count if 'snapshots_count' in locals() else 0}")
    print(f"  ‚Ä¢ Price points created: {prices_count if 'prices_count' in locals() else 0}")
    print(f"  ‚Ä¢ Intraday PnL rows: {len(intraday_df)}")
    print(f"  ‚Ä¢ Daily PnL rows: {len(daily_df) if 'daily_df' in locals() else 0}")
    
    print("\n‚úÖ The DefInd ‚Üí PnL pipeline is working!")
    
    # Show key metrics
    if len(intraday_df) > 0:
        total_value = intraday_df['value_total_usd'].iloc[-1]
        total_il = intraday_df['il_usd'].iloc[-1]
        print(f"\nFinal Position Metrics:")
        print(f"  ‚Ä¢ Total Value: ${total_value:.2f}")
        print(f"  ‚Ä¢ Impermanent Loss: ${total_il:.2f}")
else:
    print("\n‚ö†Ô∏è Pipeline incomplete - missing DefInd data")
    print("\nNext steps:")
    print("1. Run DefInd to index some blockchain events")
    print("2. Re-run this notebook to see the full pipeline")
    print("3. The PnL engine is ready and waiting for data!")

print("\nüöÄ DefInd + Define integration complete!")