# Load CSV Fundamental Data into Zipline Custom Database

This notebook demonstrates how to:
1. Load fundamental data from CSV files
2. Map symbols to Zipline SIDs
3. Create a custom SQLite database
4. Use the data in Zipline Pipeline

This is a zipline-reloaded native approach (no QuantRocket dependencies).

## 1. Setup and Imports

In [None]:
import os
import glob
import sqlite3
import pandas as pd
import numpy as np
from pathlib import Path

# Zipline imports
from zipline.data.bundles import load as load_bundle, register
from zipline.data.bundles.sharadar_bundle import sharadar_bundle
from zipline.pipeline import Pipeline
from zipline.pipeline.data.db import Database, Column

# Register Sharadar bundle (in case extension.py didn't load)
try:
    # Try to register the bundle
    register(
        'sharadar',
        sharadar_bundle(
            tickers=None,
            incremental=True,
            include_funds=True,
        ),
    )
    print("âœ“ Registered Sharadar bundle")
except Exception as e:
    # Bundle may already be registered
    print(f"âœ“ Sharadar bundle already registered (or error: {e})")

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 50)
pd.set_option('display.width', 120)

print("âœ“ Imports complete")

## 2. Configuration

Set your database name and data directory paths.

In [None]:
# Configuration
DATABASE_NAME = "refe-fundamentals"  # Name for your custom database
DATA_DIR = "/data/csv/"  # Directory with CSV files (persistent across Docker restarts)
VIX_SIGNAL_PATH = "/data/csv/vix_flag.csv"  # Optional VIX signal data

# Database will be created in ~/.zipline/data/custom/
DB_DIR = Path.home() / '.zipline' / 'data' / 'custom'
DB_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = DB_DIR / f"{DATABASE_NAME}.sqlite"

# Database update mode:
# 'fresh' - Drop and recreate database (default)
# 'replace' - Insert or replace existing records (updates duplicates)
# 'ignore' - Insert or ignore (skips duplicates, keeps existing data)
UPDATE_MODE = 'replace'  # Change to 'fresh', 'replace', or 'ignore'

print(f"Database will be created at: {DB_PATH}")
print(f"Update mode: {UPDATE_MODE}")
print(f"  - 'fresh': Drop and recreate database")
print(f"  - 'replace': Update existing records with new data")
print(f"  - 'ignore': Skip records that already exist")
print(f"\nLooking for CSV files in: {DATA_DIR}")
print(f"\nðŸ’¡ Tip: Place your CSV files in /data/csv/ (inside container)")
print(f"   or ./data/csv/ (on host machine) for persistent storage")

## 3. Define Database Schema

Define the columns that will be in your custom database.

In [None]:
# Define your database schema
# This matches the columns from the QuantRocket example
SCHEMA = {
    'Symbol': 'TEXT',
    'Sid': 'INTEGER',
    'Date': 'TEXT',
    'RefPriceClose': 'REAL',
    'RefVolume': 'REAL',
    'CompanyCommonName': 'TEXT',
    'EnterpriseValue_DailyTimeSeries_': 'REAL',
    'CompanyMarketCap': 'REAL',
    'GICSSectorName': 'TEXT',
    'FOCFExDividends_Discrete': 'REAL',
    'InterestExpense_NetofCapitalizedInterest': 'REAL',
    'Debt_Total': 'REAL',
    'EarningsPerShare_Actual': 'REAL',
    'EarningsPerShare_SmartEstimate_prev_Q': 'REAL',
    'EarningsPerShare_ActualSurprise': 'REAL',
    'EarningsPerShare_SmartEstimate_current_Q': 'REAL',
    'LongTermGrowth_Mean': 'REAL',
    'PriceTarget_Median': 'REAL',
    'CombinedAlphaModelSectorRank': 'REAL',
    'CombinedAlphaModelSectorRankChange': 'REAL',
    'CombinedAlphaModelRegionRank': 'REAL',
    'TradeDate': 'TEXT',
    'EPS_SurpirsePrct_prev_Q': 'REAL',
    'Estpricegrowth_percent': 'REAL',
    'CashFlowComponent_Current': 'REAL',
    'EarningsQualityRegionRank_Current': 'REAL',
    'EnterpriseValueToEBIT_DailyTimeSeriesRatio_': 'REAL',
    'EnterpriseValueToEBITDA_DailyTimeSeriesRatio_': 'REAL',
    'EnterpriseValueToSales_DailyTimeSeriesRatio_': 'REAL',
    'Dividend_Per_Share_SmartEstimate': 'REAL',
    'CashFlowPerShare_BrokerEstimate': 'REAL',
    'FreeCashFlowPerShare_BrokerEstimate': 'REAL',
    'ForwardPEG_DailyTimeSeriesRatio_': 'REAL',
    'PriceEarningsToGrowthRatio_SmartEstimate_': 'REAL',
    'ReturnOnInvestedCapital_BrokerEstimate': 'REAL',
    'Recommendation_NumberOfTotal': 'REAL',
    'Recommendation_Median_1_5_': 'REAL',
    'Recommendation_NumberOfStrongBuy': 'REAL',
    'Recommendation_NumberOfBuy': 'REAL',
    'Recommendation_Mean_1_5_': 'REAL',
    'ReturnOnCapitalEmployed_Actual': 'REAL',
    'GrossProfitMargin_': 'REAL',
    'ReturnOnEquity_SmartEstimat': 'REAL',
    'ReturnOnAssets_SmartEstimate': 'REAL',
    'CashCashEquivalents_Total': 'REAL',
    'ForwardPriceToCashFlowPerShare_DailyTimeSeriesRatio_': 'REAL',
    'ForwardPriceToSalesPerShare_DailyTimeSeriesRatio_': 'REAL',
    'ForwardEnterpriseValueToOperatingCashFlow_DailyTimeSeriesRatio_': 'REAL',
    'GrossProfitMargin_ActualSurprise': 'REAL',
    'pred': 'REAL',  # VIX signal
}

print(f"âœ“ Schema defined with {len(SCHEMA)} columns")

## 4. Load CSV Files

Load all CSV files from the data directory and concatenate them.

In [None]:
# Find all CSV files
os.chdir(DATA_DIR)
csv_files = sorted(glob.glob('*.csv'))

print(f"Found {len(csv_files)} CSV files:")
for f in csv_files[:5]:  # Show first 5
    print(f"  - {f}")
if len(csv_files) > 5:
    print(f"  ... and {len(csv_files) - 5} more")

# Load and concatenate all CSV files
print("\nLoading CSV files...")
custom_data = pd.DataFrame()

for csv_file in csv_files:
    print(f"  Loading {csv_file}...")
    df = pd.read_csv(os.path.join(DATA_DIR, csv_file))
    custom_data = pd.concat([custom_data, df], ignore_index=True)

print(f"\nâœ“ Loaded {len(custom_data):,} total rows")
print(f"Date range: {custom_data['Date'].min()} to {custom_data['Date'].max()}")
print(f"Unique symbols: {custom_data['Symbol'].nunique()}")

# Show sample
print("\nSample data:")
custom_data.head()

## 5. Optional: Load Recent Data Only

To reduce memory usage, you can filter to recent data only.

In [None]:
# Optional: Keep only recent data (e.g., last 600,000 rows)
# Comment out if you want all historical data
RECENT_ROWS = 600000

if len(custom_data) > RECENT_ROWS:
    print(f"Filtering to most recent {RECENT_ROWS:,} rows...")
    custom_data = custom_data.tail(RECENT_ROWS).copy()
    print(f"âœ“ Filtered. New date range: {custom_data['Date'].min()} to {custom_data['Date'].max()}")
else:
    print(f"Dataset has {len(custom_data):,} rows - no filtering needed")

## 6. Map Symbols to Zipline SIDs

Map your symbols to Zipline Security IDs (SIDs) using the asset finder.

In [None]:
# Load the Sharadar bundle to get the asset finder
print("Loading Sharadar bundle...")

# Load bundle with current timestamp
# This gives us access to the asset database
bundle_timestamp = pd.Timestamp.now(tz='UTC')
bundle_data = load_bundle('sharadar', timestamp=bundle_timestamp)
asset_finder = bundle_data.asset_finder

# Get all equities
print("Mapping symbols to SIDs...")
all_assets = asset_finder.retrieve_all(asset_finder.sids)

# Create symbol -> sid mapping
symbol_to_sid = {}
for asset in all_assets:
    if hasattr(asset, 'symbol'):
        symbol_to_sid[asset.symbol] = asset.sid

print(f"âœ“ Found {len(symbol_to_sid):,} symbols in bundle")

# Map SIDs to your data
custom_data['Sid'] = custom_data['Symbol'].map(symbol_to_sid)

# Check mapping success
mapped = custom_data['Sid'].notna().sum()
unmapped = custom_data['Sid'].isna().sum()

print(f"\nMapping results:")
print(f"  Mapped: {mapped:,} rows ({mapped/len(custom_data)*100:.1f}%)")
print(f"  Unmapped: {unmapped:,} rows ({unmapped/len(custom_data)*100:.1f}%)")

if unmapped > 0:
    unmapped_symbols = custom_data[custom_data['Sid'].isna()]['Symbol'].unique()
    print(f"\n  Unmapped symbols (first 10): {list(unmapped_symbols[:10])}")
    print(f"  Tip: These symbols may not be in the Sharadar bundle")

# Remove unmapped rows
custom_data = custom_data[custom_data['Sid'].notna()].copy()
print(f"\nâœ“ Kept {len(custom_data):,} mapped rows")

## 7. Merge VIX Signal Data (Optional)

If you have additional data like VIX signals, merge it here.

In [None]:
# Load VIX signal data if available
if os.path.exists(VIX_SIGNAL_PATH):
    print(f"Loading VIX signal from {VIX_SIGNAL_PATH}...")
    vix_signal = pd.read_csv(VIX_SIGNAL_PATH)
    
    # Standardize column names
    vix_signal.rename(columns={'symbol': 'Symbol', 'date': 'Date'}, inplace=True)
    vix_signal['Date'] = pd.to_datetime(vix_signal['Date'])
    
    # Merge with custom data
    custom_data['Date'] = pd.to_datetime(custom_data['Date'])
    custom_data = pd.merge(custom_data, vix_signal[['Symbol', 'Date', 'pred']], 
                          on=['Symbol', 'Date'], how='left')
    
    print(f"âœ“ Merged VIX signal data")
else:
    print(f"VIX signal file not found at {VIX_SIGNAL_PATH}")
    print("Skipping VIX merge (this is optional)")

## 8. Data Cleaning

Clean and prepare data for database insertion.

In [None]:
print("Cleaning data...")

# Ensure Date is datetime
custom_data['Date'] = pd.to_datetime(custom_data['Date'])

# Forward fill missing values by symbol
print("  Forward filling missing values by symbol...")
for col in custom_data.columns:
    if col not in ['Symbol', 'Sid', 'Date']:
        custom_data[col] = custom_data.groupby('Symbol')[col].transform(lambda x: x.ffill())

# Handle sector - fill empty strings instead of NaN
custom_data['GICSSectorName'] = custom_data['GICSSectorName'].fillna('')

# Fill remaining NaNs with 0 (for numeric columns)
print("  Filling remaining NaN values...")
custom_data = custom_data.fillna(0)

# Convert Sid to integer
custom_data['Sid'] = custom_data['Sid'].astype(int)

# Sort by date and symbol
custom_data = custom_data.sort_values(['Date', 'Symbol'])

print(f"âœ“ Data cleaned")
print(f"\nFinal dataset:")
print(f"  Rows: {len(custom_data):,}")
print(f"  Columns: {len(custom_data.columns)}")
print(f"  Date range: {custom_data['Date'].min()} to {custom_data['Date'].max()}")
print(f"  Symbols: {custom_data['Symbol'].nunique()}")

# Show sample
print("\nSample cleaned data:")
custom_data.head()

## 9. Create SQLite Database

Create the custom SQLite database in Zipline format.

The notebook supports three update modes (configured in Cell 2):
- **`fresh`**: Drop and recreate the database (default for initial load)
- **`replace`**: INSERT OR REPLACE - Updates existing records based on (Sid, Date) key
- **`ignore`**: INSERT OR IGNORE - Skips records that already exist, keeps existing data

Use `replace` mode to update data with newer values, or `ignore` mode to only add new data without overwriting existing records.

In [None]:
print(f"Creating database at {DB_PATH}...")

# Handle database based on update mode
db_exists = DB_PATH.exists()
if UPDATE_MODE == 'fresh' and db_exists:
    print(f"  Removing existing database (mode='fresh')...")
    DB_PATH.unlink()
    db_exists = False
elif db_exists:
    print(f"  Database exists - will {UPDATE_MODE} existing records...")

# Create database connection
conn = sqlite3.connect(str(DB_PATH))
cursor = conn.cursor()

# Create table if it doesn't exist (with UNIQUE constraint for upserts)
if not db_exists or UPDATE_MODE == 'fresh':
    columns_def = ', '.join([f'"{col}" {dtype}' for col, dtype in SCHEMA.items()])
    create_table_sql = f'''
    CREATE TABLE IF NOT EXISTS fundamentals (
        {columns_def},
        UNIQUE(Sid, Date)
    );
    '''
    
    print("  Creating table...")
    cursor.execute(create_table_sql)
    
    # Create indices for fast lookups
    print("  Creating indices...")
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_sid ON fundamentals(Sid);')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON fundamentals(Date);')
    cursor.execute('CREATE INDEX IF NOT EXISTS idx_symbol ON fundamentals(Symbol);')

# Insert data
print(f"  Inserting {len(custom_data):,} rows with mode='{UPDATE_MODE}'...")

# Prepare data for insertion - only use columns that exist in custom_data
# Add missing columns with default values (0 for numeric, empty string for text)
insert_data = custom_data.copy()

# Add any missing schema columns with appropriate defaults
missing_cols = []
for col, dtype in SCHEMA.items():
    if col not in insert_data.columns:
        if dtype == 'TEXT':
            insert_data[col] = ''
        else:  # REAL or INTEGER
            insert_data[col] = 0
        missing_cols.append(col)

if missing_cols:
    print(f"  Added {len(missing_cols)} missing columns with default values")

# Select only the columns in the schema (in the correct order)
insert_data = insert_data[list(SCHEMA.keys())].copy()

# Convert Date to string format for SQLite
insert_data['Date'] = insert_data['Date'].dt.strftime('%Y-%m-%d')

# Choose SQL command based on update mode
if UPDATE_MODE == 'replace':
    sql_command = 'INSERT OR REPLACE'
elif UPDATE_MODE == 'ignore':
    sql_command = 'INSERT OR IGNORE'
else:  # 'fresh' or default
    sql_command = 'INSERT'

# Create parameterized INSERT statement
columns = list(SCHEMA.keys())
placeholders = ', '.join(['?' for _ in columns])
column_names = ', '.join([f'"{col}"' for col in columns])
insert_sql = f'{sql_command} INTO fundamentals ({column_names}) VALUES ({placeholders})'

# Insert in chunks for better performance
chunk_size = 10000
total_chunks = (len(insert_data) + chunk_size - 1) // chunk_size
total_inserted = 0
total_skipped = 0

for i in range(0, len(insert_data), chunk_size):
    chunk = insert_data.iloc[i:i+chunk_size]
    
    # Execute batch insert
    cursor.executemany(insert_sql, chunk.values.tolist())
    
    rows_affected = cursor.rowcount
    if UPDATE_MODE == 'ignore':
        # With INSERT OR IGNORE, rowcount shows actual inserts (not skipped)
        total_inserted += rows_affected
        total_skipped += len(chunk) - rows_affected
    else:
        total_inserted += rows_affected
    
    chunk_num = i // chunk_size + 1
    if chunk_num % 10 == 0 or chunk_num == total_chunks:
        print(f"    Processed chunk {chunk_num}/{total_chunks} ({i+len(chunk):,} rows)...")

conn.commit()

# Report results
print(f"\nâœ“ Database operation completed!")
print(f"  Path: {DB_PATH}")
print(f"  Mode: {UPDATE_MODE}")
print(f"  Rows processed: {len(insert_data):,}")
if UPDATE_MODE == 'ignore' and total_skipped > 0:
    print(f"  Rows inserted: {total_inserted:,}")
    print(f"  Rows skipped (already existed): {total_skipped:,}")

# Get final row count
cursor.execute("SELECT COUNT(*) FROM fundamentals")
total_rows = cursor.fetchone()[0]
print(f"  Total rows in database: {total_rows:,}")

conn.close()

print(f"  Size: {DB_PATH.stat().st_size / 1024 / 1024:.1f} MB")

## 10. Define Database Class

Create a Database class to use this data in Zipline Pipeline.

In [None]:
# Define the Database class
class REFEFundamentals(Database):
    """
    Custom REFE Fundamentals database.
    
    Usage in Pipeline:
        roe = REFEFundamentals.ReturnOnEquity_SmartEstimat.latest
        sector = REFEFundamentals.GICSSectorName.latest
    """
    
    CODE = DATABASE_NAME
    LOOKBACK_WINDOW = 252  # Days to look back
    
    # Price and volume
    RefPriceClose = Column(float)
    RefVolume = Column(float)
    
    # Company info
    CompanyCommonName = Column(str)
    GICSSectorName = Column(str)
    
    # Valuation metrics
    EnterpriseValue_DailyTimeSeries_ = Column(float)
    CompanyMarketCap = Column(float)
    
    # Cash flow
    FOCFExDividends_Discrete = Column(float)
    CashFlowComponent_Current = Column(float)
    CashFlowPerShare_BrokerEstimate = Column(float)
    FreeCashFlowPerShare_BrokerEstimate = Column(float)
    
    # Debt and interest
    InterestExpense_NetofCapitalizedInterest = Column(float)
    Debt_Total = Column(float)
    
    # Earnings
    EarningsPerShare_Actual = Column(float)
    EarningsPerShare_SmartEstimate_prev_Q = Column(float)
    EarningsPerShare_ActualSurprise = Column(float)
    EarningsPerShare_SmartEstimate_current_Q = Column(float)
    EPS_SurpirsePrct_prev_Q = Column(float)
    
    # Growth and targets
    LongTermGrowth_Mean = Column(float)
    PriceTarget_Median = Column(float)
    Estpricegrowth_percent = Column(float)
    
    # Rankings
    CombinedAlphaModelSectorRank = Column(float)
    CombinedAlphaModelSectorRankChange = Column(float)
    CombinedAlphaModelRegionRank = Column(float)
    EarningsQualityRegionRank_Current = Column(float)
    
    # Ratios
    EnterpriseValueToEBIT_DailyTimeSeriesRatio_ = Column(float)
    EnterpriseValueToEBITDA_DailyTimeSeriesRatio_ = Column(float)
    EnterpriseValueToSales_DailyTimeSeriesRatio_ = Column(float)
    ForwardPEG_DailyTimeSeriesRatio_ = Column(float)
    PriceEarningsToGrowthRatio_SmartEstimate_ = Column(float)
    ForwardPriceToCashFlowPerShare_DailyTimeSeriesRatio_ = Column(float)
    ForwardPriceToSalesPerShare_DailyTimeSeriesRatio_ = Column(float)
    ForwardEnterpriseValueToOperatingCashFlow_DailyTimeSeriesRatio_ = Column(float)
    
    # Returns
    ReturnOnInvestedCapital_BrokerEstimate = Column(float)
    ReturnOnCapitalEmployed_Actual = Column(float)
    ReturnOnEquity_SmartEstimat = Column(float)
    ReturnOnAssets_SmartEstimate = Column(float)
    
    # Margins
    GrossProfitMargin_ = Column(float)
    GrossProfitMargin_ActualSurprise = Column(float)
    
    # Analyst recommendations
    Recommendation_NumberOfTotal = Column(float)
    Recommendation_Median_1_5_ = Column(float)
    Recommendation_NumberOfStrongBuy = Column(float)
    Recommendation_NumberOfBuy = Column(float)
    Recommendation_Mean_1_5_ = Column(float)
    
    # Cash
    CashCashEquivalents_Total = Column(float)
    
    # Dividends
    Dividend_Per_Share_SmartEstimate = Column(float)
    
    # VIX prediction signal
    pred = Column(float)


print("âœ“ REFEFundamentals Database class defined")
print(f"  Database code: {REFEFundamentals.CODE}")
print(f"  Lookback window: {REFEFundamentals.LOOKBACK_WINDOW} days")
print(f"  Columns defined: {len([attr for attr in dir(REFEFundamentals) if isinstance(getattr(REFEFundamentals, attr), Column)])}")

print("\nExample usage:")
print("  roe = REFEFundamentals.ReturnOnEquity_SmartEstimat.latest")
print("  pe_growth = REFEFundamentals.PriceEarningsToGrowthRatio_SmartEstimate_.latest")
print("  sector = REFEFundamentals.GICSSectorName.latest")

## 11. Verify Database

Query the database to verify data was loaded correctly.

In [None]:
# Connect and query
conn = sqlite3.connect(str(DB_PATH))

# Get row count
row_count = pd.read_sql("SELECT COUNT(*) as count FROM fundamentals", conn).iloc[0, 0]
print(f"Total rows in database: {row_count:,}")

# Get date range
date_range = pd.read_sql("SELECT MIN(Date) as min_date, MAX(Date) as max_date FROM fundamentals", conn)
print(f"Date range: {date_range.iloc[0, 0]} to {date_range.iloc[0, 1]}")

# Get symbol count
symbol_count = pd.read_sql("SELECT COUNT(DISTINCT Symbol) as count FROM fundamentals", conn).iloc[0, 0]
print(f"Unique symbols: {symbol_count:,}")

# Show sample data for a specific symbol
print("\nSample data for AAPL:")
aapl_data = pd.read_sql("""
    SELECT Date, Symbol, RefPriceClose, CompanyMarketCap, 
           ReturnOnEquity_SmartEstimat, PriceTarget_Median
    FROM fundamentals 
    WHERE Symbol = 'AAPL' 
    ORDER BY Date DESC 
    LIMIT 5
""", conn)
print(aapl_data)

print("\nSample data for IBM:")
ibm_data = pd.read_sql("""
    SELECT Date, Symbol, RefPriceClose, CompanyMarketCap, 
           ReturnOnEquity_SmartEstimat, GICSSectorName
    FROM fundamentals 
    WHERE Symbol = 'IBM' 
    ORDER BY Date DESC 
    LIMIT 5
""", conn)
print(ibm_data)

conn.close()

print("\nâœ“ Database verification complete")

## 12. Usage Example

Example of how to use this database in a backtest.

In [None]:
print("To use this database in your backtests:")
print("\n1. Import the Database class:")
print("   from zipline.pipeline.data.db import Database, Column")
print("\n2. Define the REFEFundamentals class (from cell 10 above)")
print("\n3. Use in your pipeline:")
print("   ")
print("   def make_pipeline():")
print("       roe = REFEFundamentals.ReturnOnEquity_SmartEstimat.latest")
print("       growth = REFEFundamentals.LongTermGrowth_Mean.latest")
print("       sector = REFEFundamentals.GICSSectorName.latest")
print("       ")
print("       # Screen for quality companies")
print("       quality = (roe > 15) & (growth > 10)")
print("       ")
print("       return Pipeline(")
print("           columns={")
print("               'ROE': roe,")
print("               'Growth': growth,")
print("               'Sector': sector,")
print("           },")
print("           screen=quality")
print("       )")
print("\n4. The CustomSQLiteLoader will automatically load data based on REFEFundamentals.CODE")

print("\nâœ“ Setup complete! Your custom fundamentals database is ready to use.")

## Summary

This notebook:
1. âœ… Loaded CSV files with fundamental data
2. âœ… Mapped symbols to Zipline SIDs using the asset finder
3. âœ… Cleaned and prepared the data
4. âœ… Created a custom SQLite database in ~/.zipline/data/custom/
5. âœ… Defined a Database class for use in Pipeline
6. âœ… Verified the database contents

The database is now ready to use in your Zipline backtests with the CustomSQLiteLoader.

**Next steps:**
- See the examples below for using the data with Pipeline
- Copy the REFEFundamentals class definition to your backtest algorithm
- Use REFEFundamentals.ColumnName.latest in your pipeline
- The backtest_helpers.py will automatically detect and load the data

## 13. Pipeline Examples

Now let's demonstrate how to query and analyze the fundamentals data using Zipline Pipeline.

These examples show:
- Creating a pipeline with custom fundamentals
- Running the pipeline over date ranges
- Filtering stocks by fundamental criteria
- Extracting time series data for specific symbols
- Combining multiple fundamental factors

## 13. Pipeline Examples

Now let's demonstrate how to query and analyze the fundamentals data using Zipline Pipeline.

These examples show:
- Creating a pipeline with custom fundamentals
- Running the pipeline over date ranges
- Filtering stocks by fundamental criteria
- Extracting time series data for specific symbols
- Combining multiple fundamental factors


### Example 1: Setup Pipeline Engine

First, we need to set up the Pipeline engine to load our custom data.

In [None]:
from zipline.pipeline import Pipeline
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.domain import US_EQUITIES
from zipline.utils.calendar_utils import get_calendar

# Import the custom loader from zipline
from zipline.data.custom import CustomSQLiteLoader

# Get the trading calendar
trading_calendar = get_calendar('NYSE')

# Set up the pipeline engine with our custom loaders
def get_pipeline_loader(column):
    """
    Pipeline loader factory that routes columns to appropriate loaders.
    """
    # Route custom fundamentals to CustomSQLiteLoader
    # Check by database CODE since datasets get bound to domains
    if hasattr(column.dataset, 'CODE') and column.dataset.CODE == REFEFundamentals.CODE:
        # Pass the database CODE to the loader
        return CustomSQLiteLoader(REFEFundamentals.CODE)
    
    # Route pricing data to bundle
    if column in USEquityPricing.columns:
        return USEquityPricingLoader(bundle_data.equity_daily_bar_reader, bundle_data.adjustment_reader)
    
    raise ValueError(f"No loader for {column}")

# Create the pipeline engine
engine = SimplePipelineEngine(
    get_loader=get_pipeline_loader,
    asset_finder=asset_finder,
    default_domain=US_EQUITIES,
)

print("âœ“ Pipeline engine configured with custom fundamentals loader")
print(f"  Trading calendar: {trading_calendar.name}")
print(f"  Asset finder: {len(asset_finder.sids):,} securities")

### Example 2: Basic Pipeline - Get Latest Fundamentals

Create a simple pipeline to get the latest fundamentals for all stocks.

In [None]:
# Define a simple pipeline
def make_basic_pipeline():
    """
    Get latest fundamentals for all stocks.
    """
    return Pipeline(
        columns={
            'ROE': REFEFundamentals.ReturnOnEquity_SmartEstimat.latest,
            'ROA': REFEFundamentals.ReturnOnAssets_SmartEstimate.latest,
            'Market_Cap': REFEFundamentals.CompanyMarketCap.latest,
            'Price': REFEFundamentals.RefPriceClose.latest,
            'Sector': REFEFundamentals.GICSSectorName.latest,
            'EV_to_EBITDA': REFEFundamentals.EnterpriseValueToEBITDA_DailyTimeSeriesRatio_.latest,
        },
    )

# Run the pipeline for a single date
# Get a recent valid trading session from the bundle (last 3 months)
pipeline = make_basic_pipeline()

# Use recent trading sessions (last 3 months of data)
# Note: sessions_in_range expects timezone-naive dates
end_search = pd.Timestamp.now()
start_search = end_search - pd.DateOffset(months=3)

sessions = trading_calendar.sessions_in_range(
    start_search.tz_localize(None) if start_search.tz else start_search,
    end_search.tz_localize(None) if end_search.tz else end_search
)
start_date = sessions[-5]  # Use 5 days back from the end
end_date = start_date

print(f"Using date: {start_date.date()}")

result = engine.run_pipeline(pipeline, start_date, end_date)

print(f"âœ“ Pipeline run complete")
print(f"  Date: {start_date.date()}")
print(f"  Stocks: {len(result):,}")
print(f"\nTop 10 stocks by ROE:")
print(result.nlargest(10, 'ROE')[['ROE', 'ROA', 'Market_Cap', 'Sector']])

### Example 3: Filtered Pipeline - Quality Stocks

Filter stocks based on fundamental criteria (e.g., high ROE, profitable, large cap).

In [None]:
def make_quality_pipeline():
    """
    Screen for quality stocks with strong fundamentals.
    """
    # Get fundamentals
    roe = REFEFundamentals.ReturnOnEquity_SmartEstimat.latest
    roa = REFEFundamentals.ReturnOnAssets_SmartEstimate.latest
    market_cap = REFEFundamentals.CompanyMarketCap.latest
    growth = REFEFundamentals.LongTermGrowth_Mean.latest
    price_target = REFEFundamentals.PriceTarget_Median.latest
    current_price = REFEFundamentals.RefPriceClose.latest
    sector = REFEFundamentals.GICSSectorName.latest
    
    # Calculate upside potential
    upside = ((price_target - current_price) / current_price) * 100
    
    # Define quality screen
    quality_screen = (
        (roe > 15) &  # Strong return on equity
        (roa > 5) &   # Profitable
        (market_cap > 1_000_000_000) &  # Large cap ($1B+)
        (growth > 10) &  # Double-digit growth
        (upside > 10)  # At least 10% upside
    )
    
    return Pipeline(
        columns={
            'ROE': roe,
            'ROA': roa,
            'Market_Cap': market_cap,
            'Growth': growth,
            'Price': current_price,
            'Target': price_target,
            'Upside_%': upside,
            'Sector': sector,
        },
        screen=quality_screen,
    )

# Run the filtered pipeline
# Use a recent valid trading session (last 3 months)
# Note: sessions_in_range expects timezone-naive dates
end_search = pd.Timestamp.now()
start_search = end_search - pd.DateOffset(months=3)

sessions = trading_calendar.sessions_in_range(
    start_search.tz_localize(None) if start_search.tz else start_search,
    end_search.tz_localize(None) if end_search.tz else end_search
)
start_date = sessions[-5]  # Use 5 days back from the end

pipeline = make_quality_pipeline()
result = engine.run_pipeline(pipeline, start_date, start_date)

print(f"âœ“ Quality screen results:")
print(f"  Date: {start_date.date()}")
print(f"  Stocks passing screen: {len(result)}")
print(f"\nTop 10 by upside potential:")
print(result.nlargest(10, 'Upside_%')[['ROE', 'Growth', 'Price', 'Target', 'Upside_%', 'Sector']])

### Example 4: Time Series Data - Track Fundamentals Over Time

Get historical fundamental data for specific symbols to analyze trends.

In [None]:
# Define symbols to track
symbols = ['AAPL', 'MSFT', 'GOOGL']

# Get the assets
assets = [asset_finder.lookup_symbol(sym, as_of_date=None) for sym in symbols]

# Create pipeline
pipeline = Pipeline(
    columns={
        'ROE': REFEFundamentals.ReturnOnEquity_SmartEstimat.latest,
        'Market_Cap': REFEFundamentals.CompanyMarketCap.latest,
        'Price': REFEFundamentals.RefPriceClose.latest,
        'Growth': REFEFundamentals.LongTermGrowth_Mean.latest,
        'EV_EBITDA': REFEFundamentals.EnterpriseValueToEBITDA_DailyTimeSeriesRatio_.latest,
    }
)

# Run over a date range (last 2 months of available data)
# Get valid trading sessions from the calendar
# Note: sessions_in_range expects timezone-naive dates
end_search = pd.Timestamp.now()
start_search = end_search - pd.DateOffset(months=3)

sessions = trading_calendar.sessions_in_range(
    start_search.tz_localize(None) if start_search.tz else start_search,
    end_search.tz_localize(None) if end_search.tz else end_search
)
end_date = sessions[-5]  # Use 5 days back from the end
start_date = end_date - pd.DateOffset(months=2)

# Ensure start_date is a valid trading session
start_date = trading_calendar.sessions_in_range(start_date.tz_localize(None), end_date.tz_localize(None))[0]

print(f"Date range: {start_date.date()} to {end_date.date()}")

result = engine.run_pipeline(pipeline, start_date, end_date)

print(f"âœ“ Time series data extracted")
print(f"  Period: {start_date.date()} to {end_date.date()}")
print(f"  Total observations: {len(result):,}")

# Filter to our symbols of interest
symbol_data = result[result.index.get_level_values('asset').isin(assets)]

print(f"  Observations for {symbols}: {len(symbol_data):,}")

# Show AAPL time series
aapl_asset = assets[0]
aapl_data = symbol_data.loc[pd.IndexSlice[:, aapl_asset], :]

print(f"\nAAPL Fundamental Trends (last 10 observations):")
print(aapl_data.tail(10)[['ROE', 'Market_Cap', 'Price', 'Growth']])

### Example 5: Visualize Time Series - Plot Fundamental Trends

Create charts to visualize how fundamentals change over time.

In [None]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

# Define symbols to track (re-define in case previous cell wasn't run)
symbols = ['AAPL', 'MSFT', 'GOOGL']
assets = [asset_finder.lookup_symbol(sym, as_of_date=None) for sym in symbols]

# Create pipeline if not already created in previous cell
if 'symbol_data' not in locals():
    print("Fetching time series data...")
    pipeline = Pipeline(
        columns={
            'ROE': REFEFundamentals.ReturnOnEquity_SmartEstimat.latest,
            'Market_Cap': REFEFundamentals.CompanyMarketCap.latest,
            'Price': REFEFundamentals.RefPriceClose.latest,
            'Growth': REFEFundamentals.LongTermGrowth_Mean.latest,
            'EV_EBITDA': REFEFundamentals.EnterpriseValueToEBITDA_DailyTimeSeriesRatio_.latest,
        }
    )
    
    # Get valid trading sessions (last 3 months, timezone-naive for sessions_in_range)
    end_search = pd.Timestamp.now()
    start_search = end_search - pd.DateOffset(months=3)
    
    sessions = trading_calendar.sessions_in_range(
        start_search.tz_localize(None) if start_search.tz else start_search,
        end_search.tz_localize(None) if end_search.tz else end_search
    )
    end_date = sessions[-5]
    start_date = end_date - pd.DateOffset(months=2)
    start_date = trading_calendar.sessions_in_range(start_date.tz_localize(None), end_date.tz_localize(None))[0]
    
    result = engine.run_pipeline(pipeline, start_date, end_date)
    symbol_data = result[result.index.get_level_values('asset').isin(assets)]
    print(f"âœ“ Fetched {len(symbol_data):,} observations")

# Create figure with subplots
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Fundamental Trends: AAPL, MSFT, GOOGL', fontsize=16, fontweight='bold')

# Prepare data for each symbol
symbol_colors = {'AAPL': 'blue', 'MSFT': 'green', 'GOOGL': 'red'}

for idx, (symbol, asset) in enumerate(zip(symbols, assets)):
    sym_data = symbol_data.loc[pd.IndexSlice[:, asset], :]
    sym_data = sym_data.reset_index()
    
    color = symbol_colors[symbol]
    
    # Plot 1: ROE over time
    axes[0, 0].plot(sym_data['date'], sym_data['ROE'], 
                    label=symbol, marker='o', color=color, alpha=0.7)
    
    # Plot 2: Market Cap over time
    axes[0, 1].plot(sym_data['date'], sym_data['Market_Cap'] / 1e9, 
                    label=symbol, marker='s', color=color, alpha=0.7)
    
    # Plot 3: Growth Rate over time
    axes[1, 0].plot(sym_data['date'], sym_data['Growth'], 
                    label=symbol, marker='^', color=color, alpha=0.7)
    
    # Plot 4: EV/EBITDA over time
    axes[1, 1].plot(sym_data['date'], sym_data['EV_EBITDA'], 
                    label=symbol, marker='D', color=color, alpha=0.7)

# Customize subplots
axes[0, 0].set_title('Return on Equity (%)', fontweight='bold')
axes[0, 0].set_ylabel('ROE (%)')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].set_title('Market Capitalization', fontweight='bold')
axes[0, 1].set_ylabel('Market Cap ($B)')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

axes[1, 0].set_title('Long-term Growth Rate', fontweight='bold')
axes[1, 0].set_ylabel('Growth (%)')
axes[1, 0].set_xlabel('Date')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

axes[1, 1].set_title('Enterprise Value / EBITDA', fontweight='bold')
axes[1, 1].set_ylabel('EV/EBITDA Ratio')
axes[1, 1].set_xlabel('Date')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

# Format x-axis dates
for ax in axes.flat:
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
    ax.xaxis.set_major_locator(mdates.MonthLocator(interval=1))
    plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right')

plt.tight_layout()
plt.show()

print("âœ“ Fundamental trends visualization complete")

### Pipeline Examples Summary

You now know how to:
- âœ… Set up a Pipeline engine with custom fundamentals
- âœ… Query latest fundamentals for all stocks
- âœ… Filter stocks using fundamental criteria
- âœ… Extract time series data for specific symbols
- âœ… Visualize fundamental trends over time

**Key takeaways:**
- Use `REFEFundamentals.ColumnName.latest` to access any fundamental metric
- Combine multiple metrics with boolean operators (`&`, `|`) for screening
- Run pipelines over date ranges to analyze trends
- Filter results by asset to focus on specific symbols
- Integrate with matplotlib for visualization

**Next steps:**
- Use these patterns in your backtesting algorithms
- Create custom factors combining multiple fundamentals
- Integrate with price data from USEquityPricing
- Build sophisticated stock selection strategies