# Pipeline Research & Data Exploration

This notebook demonstrates how to use Zipline's Pipeline for research and stock screening with Sharadar data.

## What You'll Learn

- Run Pipeline queries on Sharadar bundle data
- Calculate technical factors (momentum, volatility, etc.)
- Screen stocks based on criteria
- Visualize results
- Export data for further analysis

## Prerequisites

Make sure you have Sharadar bundle ingested:

```bash
# Set API key
export NASDAQ_DATA_LINK_API_KEY='your_key_here'

# Ingest specific tickers for testing
python scripts/manage_sharadar.py ingest --tickers AAPL,MSFT,GOOGL,AMZN,TSLA,META,NVDA,NFLX,AMD,INTC

# OR ingest full database
python scripts/manage_sharadar.py ingest --all

# Check status
python scripts/manage_sharadar.py status
```

## Step 1: Setup and Imports

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
from datetime import datetime, timedelta

# Zipline imports
from zipline.pipeline import Pipeline
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.factors import (
    SimpleMovingAverage,
    EWMA,
    Returns,
    AverageDollarVolume,
    BollingerBands,
    CustomFactor,
)
from zipline.pipeline.filters import StaticAssets
from zipline.data.bundles import load
from zipline.utils.calendar_utils import get_calendar
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.loaders import USEquityPricingLoader

warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette("husl")
%matplotlib inline

print("✓ Libraries imported successfully!")

## Step 2: Load Bundle Data

In [None]:
# Choose your bundle
BUNDLE_NAME = 'sharadar'  # or 'yahoo', 'yahoo-tech', etc.

print(f"Loading bundle: {BUNDLE_NAME}...\n")

# Load bundle
bundle_data = load(BUNDLE_NAME)

# Get calendar
trading_calendar = get_calendar('NYSE')

# Get asset finder
asset_finder = bundle_data.asset_finder

# List available assets
all_assets = asset_finder.retrieve_all(asset_finder.sids)

print(f"✓ Bundle loaded successfully!")
print(f"  Assets available: {len(all_assets)}\n")

# Display sample assets
print("Sample assets:")
for asset in list(all_assets)[:10]:
    print(f"  {asset.symbol}: {asset.start_date.date()} to {asset.end_date.date()}")

## Step 3: Setup Pipeline Engine

In [None]:
# Create pricing loader
pricing_loader = USEquityPricingLoader.without_fx(
    bundle_data.equity_daily_bar_reader,
    bundle_data.adjustment_reader,
)

# Create engine
def get_loader(column):
    if column in USEquityPricing.columns:
        return pricing_loader
    raise ValueError(f"No loader for {column}")

engine = SimplePipelineEngine(
    get_loader=get_loader,
    asset_finder=asset_finder,
)

print("✓ Pipeline engine ready!")

## Step 4: Define Custom Factors

Let's create some useful technical factors for research:

In [None]:
class Momentum(CustomFactor):
    """Price momentum over N days"""
    inputs = [USEquityPricing.close]
    window_length = 20
    
    def compute(self, today, assets, out, close):
        out[:] = (close[-1] - close[0]) / close[0]


class Volatility(CustomFactor):
    """20-day price volatility"""
    inputs = [USEquityPricing.close]
    window_length = 20
    
    def compute(self, today, assets, out, close):
        daily_returns = np.diff(close, axis=0) / close[:-1]
        out[:] = np.std(daily_returns, axis=0)


class RelativeStrengthIndex(CustomFactor):
    """RSI indicator"""
    inputs = [USEquityPricing.close]
    window_length = 15
    
    def compute(self, today, assets, out, close):
        diffs = np.diff(close, axis=0)
        gains = np.where(diffs > 0, diffs, 0)
        losses = np.where(diffs < 0, -diffs, 0)
        
        avg_gains = np.mean(gains, axis=0)
        avg_losses = np.mean(losses, axis=0)
        
        rs = avg_gains / (avg_losses + 1e-10)  # Avoid division by zero
        out[:] = 100 - (100 / (1 + rs))


class TrueRange(CustomFactor):
    """True Range (for ATR calculation)"""
    inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close]
    window_length = 2
    
    def compute(self, today, assets, out, high, low, close):
        high_low = high[-1] - low[-1]
        high_close = np.abs(high[-1] - close[-2])
        low_close = np.abs(low[-1] - close[-2])
        out[:] = np.maximum(high_low, np.maximum(high_close, low_close))


print("✓ Custom factors defined")

## Step 5: Create Research Pipeline

Build a comprehensive pipeline with multiple factors:

In [None]:
def make_research_pipeline():
    """
    Create a pipeline for stock research and screening.
    """
    # Price data
    close = USEquityPricing.close.latest
    volume = USEquityPricing.volume.latest
    
    # Moving averages
    sma_10 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=10)
    sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)
    sma_50 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50)
    sma_200 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=200)
    
    # Exponential moving average
    ema_12 = EWMA(inputs=[USEquityPricing.close], window_length=12)
    ema_26 = EWMA(inputs=[USEquityPricing.close], window_length=26)
    
    # Technical indicators
    momentum_20 = Momentum(window_length=20)
    momentum_60 = Momentum(window_length=60)
    volatility = Volatility()
    rsi = RelativeStrengthIndex()
    true_range = TrueRange()
    
    # Returns
    returns_1d = Returns(window_length=2)
    returns_5d = Returns(window_length=5)
    returns_20d = Returns(window_length=20)
    
    # Volume
    dollar_volume = AverageDollarVolume(window_length=30)
    
    # Bollinger Bands
    bb = BollingerBands(inputs=[USEquityPricing.close], window_length=20)
    
    # Trading signals
    golden_cross = sma_50 > sma_200
    death_cross = sma_50 < sma_200
    above_sma20 = close > sma_20
    macd = ema_12 - ema_26
    
    # RSI signals
    rsi_oversold = rsi < 30
    rsi_overbought = rsi > 70
    
    # Liquidity filter (top 500 by dollar volume)
    liquid = dollar_volume.top(500)
    
    return Pipeline(
        columns={
            # Price & Volume
            'close': close,
            'volume': volume,
            'dollar_volume': dollar_volume,
            
            # Moving Averages
            'sma_10': sma_10,
            'sma_20': sma_20,
            'sma_50': sma_50,
            'sma_200': sma_200,
            'ema_12': ema_12,
            'ema_26': ema_26,
            
            # Technical Indicators
            'momentum_20d': momentum_20,
            'momentum_60d': momentum_60,
            'volatility': volatility,
            'rsi': rsi,
            'true_range': true_range,
            
            # Returns
            'returns_1d': returns_1d,
            'returns_5d': returns_5d,
            'returns_20d': returns_20d,
            
            # Bollinger Bands
            'bb_upper': bb.upper,
            'bb_middle': bb.middle,
            'bb_lower': bb.lower,
            
            # Signals
            'golden_cross': golden_cross,
            'death_cross': death_cross,
            'above_sma20': above_sma20,
            'macd': macd,
            'rsi_oversold': rsi_oversold,
            'rsi_overbought': rsi_overbought,
        },
        screen=liquid,  # Only liquid stocks
    )

pipeline = make_research_pipeline()
print(f"✓ Pipeline created with {len(pipeline.columns)} columns")

## Step 6: Run Pipeline

In [None]:
# Define date range for analysis
end_date = pd.Timestamp('2023-12-31', tz='UTC')
start_date = end_date - pd.Timedelta(days=30)  # Last month of data

# Get trading days in range
trading_days = trading_calendar.sessions_in_range(start_date, end_date)

print(f"Running pipeline...")
print(f"Date range: {start_date.date()} to {end_date.date()}")
print(f"Trading days: {len(trading_days)}\n")

# Run pipeline
results = engine.run_pipeline(pipeline, start_date, end_date)

print(f"✓ Pipeline complete!")
print(f"  Result shape: {results.shape}")
print(f"  Unique stocks: {len(results.index.get_level_values(1).unique())}")
print(f"  Date range: {results.index.get_level_values(0).min().date()} to {results.index.get_level_values(0).max().date()}")

## Step 7: Explore Results

In [None]:
# Get latest data (most recent trading day)
latest_date = results.index.get_level_values(0).max()
latest = results.xs(latest_date, level=0)

print(f"\n=== LATEST DATA ({latest_date.date()}) ===")
print(f"\nStocks analyzed: {len(latest)}\n")

# Display sample
print("Sample data (first 10 stocks):")
display_cols = ['close', 'momentum_20d', 'rsi', 'volatility', 'above_sma20', 'golden_cross']
print(latest[display_cols].head(10))

In [None]:
# Summary statistics
print("\n=== SUMMARY STATISTICS ===\n")
print(latest[['close', 'momentum_20d', 'rsi', 'volatility', 'returns_20d']].describe())

## Step 8: Stock Screening

Let's find interesting trading opportunities:

In [None]:
# Screen 1: Strong momentum + bullish trend
strong_momentum = latest[
    (latest['momentum_20d'] > 0.05) &  # 5%+ gain in 20 days
    (latest['above_sma20'] == True) &
    (latest['rsi'] < 70)  # Not overbought
].sort_values('momentum_20d', ascending=False)

print("\n=== STRONG MOMENTUM STOCKS ===")
print(f"Found: {len(strong_momentum)} stocks\n")
if len(strong_momentum) > 0:
    print(strong_momentum[['close', 'momentum_20d', 'rsi', 'volatility']].head(10))

In [None]:
# Screen 2: Golden Cross (bullish)
golden_cross_stocks = latest[latest['golden_cross'] == True].copy()
golden_cross_stocks = golden_cross_stocks.sort_values('momentum_60d', ascending=False)

print("\n=== GOLDEN CROSS STOCKS ===")
print(f"Found: {len(golden_cross_stocks)} stocks\n")
if len(golden_cross_stocks) > 0:
    print(golden_cross_stocks[['close', 'sma_50', 'sma_200', 'momentum_60d']].head(10))

In [None]:
# Screen 3: Oversold (potential bounce)
oversold = latest[
    (latest['rsi'] < 35) &  # Oversold
    (latest['above_sma20'] == True)  # Still in uptrend
].sort_values('rsi')

print("\n=== OVERSOLD STOCKS (Potential Bounce) ===")
print(f"Found: {len(oversold)} stocks\n")
if len(oversold) > 0:
    print(oversold[['close', 'rsi', 'momentum_20d', 'volatility']].head(10))

In [None]:
# Screen 4: Low volatility + positive momentum
low_vol_momentum = latest[
    (latest['volatility'] < latest['volatility'].quantile(0.25)) &  # Bottom 25% volatility
    (latest['momentum_20d'] > 0)
].sort_values('momentum_20d', ascending=False)

print("\n=== LOW VOLATILITY + POSITIVE MOMENTUM ===")
print(f"Found: {len(low_vol_momentum)} stocks\n")
if len(low_vol_momentum) > 0:
    print(low_vol_momentum[['close', 'momentum_20d', 'volatility', 'rsi']].head(10))

## Step 9: Visualizations

In [None]:
# Distribution plots
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
fig.suptitle('Market Factor Distributions', fontsize=16, fontweight='bold')

# Momentum distribution
axes[0, 0].hist(latest['momentum_20d'].dropna(), bins=50, alpha=0.7, edgecolor='black')
axes[0, 0].axvline(0, color='red', linestyle='--', linewidth=2)
axes[0, 0].set_title('20-Day Momentum Distribution')
axes[0, 0].set_xlabel('Momentum')
axes[0, 0].set_ylabel('Frequency')

# RSI distribution
axes[0, 1].hist(latest['rsi'].dropna(), bins=50, alpha=0.7, edgecolor='black', color='orange')
axes[0, 1].axvline(30, color='green', linestyle='--', linewidth=2, label='Oversold')
axes[0, 1].axvline(70, color='red', linestyle='--', linewidth=2, label='Overbought')
axes[0, 1].set_title('RSI Distribution')
axes[0, 1].set_xlabel('RSI')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].legend()

# Volatility distribution
axes[1, 0].hist(latest['volatility'].dropna(), bins=50, alpha=0.7, edgecolor='black', color='purple')
axes[1, 0].set_title('Volatility Distribution')
axes[1, 0].set_xlabel('Volatility')
axes[1, 0].set_ylabel('Frequency')

# Momentum vs RSI scatter
axes[1, 1].scatter(latest['momentum_20d'], latest['rsi'], alpha=0.5)
axes[1, 1].axhline(50, color='gray', linestyle='--', alpha=0.5)
axes[1, 1].axvline(0, color='gray', linestyle='--', alpha=0.5)
axes[1, 1].set_title('Momentum vs RSI')
axes[1, 1].set_xlabel('20-Day Momentum')
axes[1, 1].set_ylabel('RSI')

plt.tight_layout()
plt.show()

In [None]:
# Signal counts over time
signal_counts = results.groupby(level=0)[['golden_cross', 'death_cross', 'rsi_oversold', 'rsi_overbought']].sum()

fig, ax = plt.subplots(figsize=(14, 6))
signal_counts.plot(ax=ax, linewidth=2, marker='o')
ax.set_title('Trading Signals Over Time', fontsize=14, fontweight='bold')
ax.set_xlabel('Date')
ax.set_ylabel('Number of Stocks')
ax.legend(['Golden Cross', 'Death Cross', 'RSI Oversold', 'RSI Overbought'])
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## Step 10: Export Results

Export data for further analysis or reporting:

In [None]:
# Export latest screening results
output_dir = '/data/research_output'
import os
os.makedirs(output_dir, exist_ok=True)

# Export strong momentum stocks
if len(strong_momentum) > 0:
    filename = f"{output_dir}/strong_momentum_{latest_date.date()}.csv"
    strong_momentum.to_csv(filename)
    print(f"✓ Exported strong momentum stocks to: {filename}")

# Export golden cross stocks
if len(golden_cross_stocks) > 0:
    filename = f"{output_dir}/golden_cross_{latest_date.date()}.csv"
    golden_cross_stocks.to_csv(filename)
    print(f"✓ Exported golden cross stocks to: {filename}")

# Export oversold stocks
if len(oversold) > 0:
    filename = f"{output_dir}/oversold_{latest_date.date()}.csv"
    oversold.to_csv(filename)
    print(f"✓ Exported oversold stocks to: {filename}")

# Export all latest data
filename = f"{output_dir}/all_stocks_{latest_date.date()}.csv"
latest.to_csv(filename)
print(f"✓ Exported all stocks data to: {filename}")

## Step 11: Time Series Analysis for Specific Stocks

In [None]:
# Pick top momentum stocks to analyze
if len(strong_momentum) > 0:
    top_stocks = strong_momentum.head(4).index
    
    fig, axes = plt.subplots(2, 2, figsize=(16, 10))
    fig.suptitle('Top Momentum Stocks - Time Series Analysis', fontsize=16, fontweight='bold')
    
    for idx, stock_sid in enumerate(top_stocks):
        ax = axes[idx // 2, idx % 2]
        
        # Get time series data for this stock
        stock_data = results.xs(stock_sid, level=1)
        
        # Get stock symbol
        asset = asset_finder.retrieve_asset(stock_sid)
        
        # Plot close price with moving averages
        ax.plot(stock_data.index, stock_data['close'], label='Close', linewidth=2)
        ax.plot(stock_data.index, stock_data['sma_20'], label='SMA 20', linestyle='--', alpha=0.7)
        ax.plot(stock_data.index, stock_data['sma_50'], label='SMA 50', linestyle='--', alpha=0.7)
        
        ax.set_title(f"{asset.symbol} - Price & Moving Averages", fontweight='bold')
        ax.set_xlabel('Date')
        ax.set_ylabel('Price ($)')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
else:
    print("No strong momentum stocks found in this period")

## Summary

### What We Did

1. ✅ Loaded Sharadar/Yahoo bundle data
2. ✅ Created comprehensive Pipeline with 25+ factors
3. ✅ Ran Pipeline queries on historical data
4. ✅ Screened stocks with multiple strategies:
   - Strong momentum
   - Golden cross
   - Oversold (RSI)
   - Low volatility + positive momentum
5. ✅ Visualized factor distributions
6. ✅ Tracked signals over time
7. ✅ Exported results to CSV
8. ✅ Analyzed time series for top stocks

### Next Steps

- Refine screening criteria based on results
- Backtest strategies using `run_algorithm()`
- Add fundamental factors (if using Sharadar SF1)
- Automate daily research reports
- Integrate with trading signals

### Resources

- [Pipeline Tutorial](https://zipline.ml4trading.io/beginner-tutorial.html)
- [Custom Factors](https://zipline.ml4trading.io/api-reference.html#custom-factors)
- [Backtest Notebook](./06_sharadar_professional_backtesting.ipynb)
- [Bundle Documentation](../docs/BUNDLES.md)