# Alpaca PySpark Connector Demonstration

This notebook demonstrates how to use the Alpaca Historical Bars PySpark Connector to fetch and analyze stock market data using distributed computation.

## Prerequisites

1. **Alpaca API Credentials**: You need an Alpaca account and API credentials
   - Sign up at [Alpaca](https://alpaca.markets/)
   - Generate API keys from your dashboard
   - Set environment variables `ALPACA_API_KEY` and `ALPACA_SECRET_KEY`

2. **Required Libraries**: This notebook requires PySpark, requests, and visualization libraries

In [None]:
# Install required packages (uncomment if running in a fresh environment)
# !pip install pyspark requests matplotlib seaborn pandas

import os
import warnings
from datetime import datetime, timedelta

# Suppress Spark warnings for cleaner output
warnings.filterwarnings('ignore')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql_2.12:3.5.0 pyspark-shell'

## Setup Spark Session and Connector

First, we'll initialize a Spark session and create our Alpaca connector.

In [None]:
from pyspark.sql import SparkSession
from src.alpaca_connector import create_connector

# Create Spark session with optimized configuration
spark = SparkSession.builder \
    .appName("AlpacaHistoricalBarsDemo") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbose output
spark.sparkContext.setLogLevel("WARN")

print(f"Spark version: {spark.version}")
print(f"Number of cores available: {spark.sparkContext.defaultParallelism}")

In [None]:
# Create the Alpaca connector
# Note: Make sure ALPACA_API_KEY and ALPACA_SECRET_KEY environment variables are set
# For demonstration purposes, you can also pass them directly:
# connector = create_connector(spark, api_key="your_key", api_secret="your_secret")

connector = create_connector(
    spark,
    # Optional: customize configuration
    page_size=10000,  # Maximum records per API call
    date_split_days=30,  # Split date ranges into chunks of this many days
    max_retries=3,  # Number of retry attempts for failed requests
    timeout=30  # Request timeout in seconds
)

# Test connection
if connector.validate_connection():
    print("✅ Successfully connected to Alpaca API")
else:
    print("❌ Failed to connect to Alpaca API")
    print("Please check your API credentials and network connection")

## Fetching Historical Data

Let's fetch historical data for some popular stocks and demonstrate the distributed processing capabilities.

In [None]:
# Define the stocks we want to analyze
symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']

# Define date range (last 3 months)
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=90)).strftime('%Y-%m-%d')

print(f"Fetching data for {len(symbols)} symbols from {start_date} to {end_date}")
print(f"Symbols: {', '.join(symbols)}")

# Fetch the data using distributed processing
df = connector.get_historical_bars(
    symbols=symbols,
    start_date=start_date,
    end_date=end_date,
    timeframe='1Day'  # Daily bars
)

print(f"\n📊 Retrieved {df.count()} total bars")
print(f"📅 Date range: {df.agg({'timestamp': 'min'}).collect()[0][0]} to {df.agg({'timestamp': 'max'}).collect()[0][0]}")

In [None]:
# Display basic information about the dataset
print("Dataset Schema:")
df.printSchema()

print("\nFirst 10 rows:")
df.show(10, truncate=False)

In [None]:
# Basic statistics by symbol
print("📈 Basic Statistics by Symbol:")
stats_df = df.groupBy("symbol").agg(
    {"volume": "sum", "close": "avg", "high": "max", "low": "min", "*": "count"}
).withColumnRenamed("count(1)", "days_traded")

stats_df.orderBy("symbol").show()

## Data Analysis and Visualization

Now let's perform some analysis and create visualizations of our data.

In [None]:
# Convert to Pandas for easier visualization
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Convert Spark DataFrame to Pandas (be careful with large datasets)
pandas_df = df.orderBy("symbol", "timestamp").toPandas()

# Convert timestamp to datetime if it's not already
pandas_df['timestamp'] = pd.to_datetime(pandas_df['timestamp'])

print(f"Converted {len(pandas_df)} rows to Pandas DataFrame")
print(f"Memory usage: {pandas_df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

In [None]:
# Set up the plotting style
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (15, 10)

# Create a comprehensive dashboard
fig, axes = plt.subplots(2, 2, figsize=(20, 15))
fig.suptitle('Alpaca Historical Data Analysis Dashboard', fontsize=16, fontweight='bold')

# 1. Price trends over time
ax1 = axes[0, 0]
for symbol in symbols:
    symbol_data = pandas_df[pandas_df['symbol'] == symbol]
    ax1.plot(symbol_data['timestamp'], symbol_data['close'], label=symbol, linewidth=2)

ax1.set_title('Stock Price Trends (Close)', fontweight='bold')
ax1.set_xlabel('Date')
ax1.set_ylabel('Price ($)')
ax1.legend()
ax1.grid(True, alpha=0.3)

# 2. Volume comparison
ax2 = axes[0, 1]
volume_by_symbol = pandas_df.groupby('symbol')['volume'].sum().sort_values(ascending=True)
bars = ax2.barh(volume_by_symbol.index, volume_by_symbol.values / 1e6)  # Convert to millions
ax2.set_title('Total Trading Volume (3 Months)', fontweight='bold')
ax2.set_xlabel('Volume (Millions)')

# Add value labels on bars
for i, bar in enumerate(bars):
    width = bar.get_width()
    ax2.text(width + width*0.01, bar.get_y() + bar.get_height()/2, 
             f'{width:.0f}M', ha='left', va='center')

# 3. Price volatility (daily returns)
ax3 = axes[1, 0]
returns_data = []
for symbol in symbols:
    symbol_data = pandas_df[pandas_df['symbol'] == symbol].sort_values('timestamp')
    daily_returns = symbol_data['close'].pct_change().dropna() * 100
    returns_data.append(daily_returns.values)

ax3.boxplot(returns_data, labels=symbols)
ax3.set_title('Daily Returns Distribution (%)', fontweight='bold')
ax3.set_xlabel('Symbol')
ax3.set_ylabel('Daily Return (%)')
ax3.grid(True, alpha=0.3)
ax3.axhline(y=0, color='red', linestyle='--', alpha=0.5)

# 4. Average daily range (High - Low)
ax4 = axes[1, 1]
pandas_df['daily_range_pct'] = ((pandas_df['high'] - pandas_df['low']) / pandas_df['close']) * 100
avg_range = pandas_df.groupby('symbol')['daily_range_pct'].mean().sort_values()

bars = ax4.bar(avg_range.index, avg_range.values, color='skyblue', alpha=0.7)
ax4.set_title('Average Daily Price Range (%)', fontweight='bold')
ax4.set_xlabel('Symbol')
ax4.set_ylabel('Average Daily Range (%)')

# Add value labels on bars
for bar in bars:
    height = bar.get_height()
    ax4.text(bar.get_x() + bar.get_width()/2., height + height*0.01,
             f'{height:.1f}%', ha='center', va='bottom')

plt.tight_layout()
plt.show()

## Advanced Analysis with PySpark

Let's perform some advanced analysis using PySpark's distributed computing capabilities.

In [None]:
from pyspark.sql.functions import col, lag, when, abs as spark_abs, avg, stddev, max as spark_max, min as spark_min
from pyspark.sql.window import Window

# Define window specifications for technical analysis
window_spec = Window.partitionBy("symbol").orderBy("timestamp")

# Calculate technical indicators
technical_df = df.withColumn(
    "prev_close", lag("close").over(window_spec)
).withColumn(
    "daily_return_pct", 
    when(col("prev_close").isNotNull(), 
         ((col("close") - col("prev_close")) / col("prev_close")) * 100
    ).otherwise(0)
).withColumn(
    "daily_range_pct",
    ((col("high") - col("low")) / col("close")) * 100
)

print("Technical indicators calculated. Sample data:")
technical_df.select("symbol", "timestamp", "close", "daily_return_pct", "daily_range_pct") \
    .filter(col("daily_return_pct") != 0) \
    .show(10, truncate=False)

In [None]:
# Calculate rolling averages using window functions
from pyspark.sql.functions import avg

# Define rolling windows
window_5d = Window.partitionBy("symbol").orderBy("timestamp").rowsBetween(-4, 0)
window_20d = Window.partitionBy("symbol").orderBy("timestamp").rowsBetween(-19, 0)

# Add moving averages
ma_df = technical_df.withColumn(
    "ma_5", avg("close").over(window_5d)
).withColumn(
    "ma_20", avg("close").over(window_20d)
)

# Find trading signals (when 5-day MA crosses above 20-day MA)
signals_df = ma_df.withColumn(
    "prev_ma_5", lag("ma_5").over(window_spec)
).withColumn(
    "prev_ma_20", lag("ma_20").over(window_spec)
).withColumn(
    "golden_cross",
    when(
        (col("prev_ma_5") <= col("prev_ma_20")) & (col("ma_5") > col("ma_20")),
        True
    ).otherwise(False)
)

# Show golden cross signals
golden_crosses = signals_df.filter(col("golden_cross") == True) \
    .select("symbol", "timestamp", "close", "ma_5", "ma_20")

print("🎯 Golden Cross Signals (5-day MA crossing above 20-day MA):")
golden_crosses.show(truncate=False)
print(f"Total signals found: {golden_crosses.count()}")

In [None]:
# Risk analysis: Calculate volatility metrics
risk_metrics = technical_df.filter(col("daily_return_pct") != 0) \
    .groupBy("symbol") \
    .agg(
        avg("daily_return_pct").alias("avg_daily_return"),
        stddev("daily_return_pct").alias("volatility"),
        spark_max("daily_return_pct").alias("max_daily_gain"),
        spark_min("daily_return_pct").alias("max_daily_loss"),
        avg("volume").alias("avg_volume")
    )

print("📊 Risk Metrics Analysis:")
risk_metrics.orderBy("volatility").show(truncate=False)

## Performance Analysis

Let's analyze the performance of our distributed data loading approach.

In [None]:
import time

# Test with different configurations to show scalability
print("🚀 Performance Testing: Different Configuration Settings\n")

test_symbols = ['AAPL', 'GOOGL']
test_start = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
test_end = datetime.now().strftime('%Y-%m-%d')

# Test 1: Larger date chunks (less parallel tasks)
print("Test 1: Larger date chunks (30 days per chunk)")
connector_large_chunks = create_connector(spark, date_split_days=30)

start_time = time.time()
df_large = connector_large_chunks.get_historical_bars(test_symbols, test_start, test_end)
count_large = df_large.count()
time_large = time.time() - start_time

print(f"   Results: {count_large} bars in {time_large:.2f} seconds")

# Test 2: Smaller date chunks (more parallel tasks)
print("\nTest 2: Smaller date chunks (7 days per chunk)")
connector_small_chunks = create_connector(spark, date_split_days=7)

start_time = time.time()
df_small = connector_small_chunks.get_historical_bars(test_symbols, test_start, test_end)
count_small = df_small.count()
time_small = time.time() - start_time

print(f"   Results: {count_small} bars in {time_small:.2f} seconds")

# Summary
print(f"\n📈 Performance Summary:")
print(f"   Large chunks: {time_large:.2f}s")
print(f"   Small chunks: {time_small:.2f}s")
if time_small < time_large:
    improvement = ((time_large - time_small) / time_large) * 100
    print(f"   ✅ Smaller chunks were {improvement:.1f}% faster")
else:
    difference = ((time_small - time_large) / time_large) * 100
    print(f"   📊 Larger chunks were {difference:.1f}% faster")

print(f"\nNote: Performance depends on network conditions, API rate limits, and cluster resources.")

## Exporting Results

Finally, let's export our analysis results to different formats.

In [None]:
# Export to Parquet (efficient for big data)
print("💾 Exporting data to various formats...")

# Create output directory
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)

# Export main dataset to Parquet
parquet_path = f"{output_dir}/historical_bars.parquet"
df.coalesce(1).write.mode("overwrite").parquet(parquet_path)
print(f"✅ Main dataset exported to: {parquet_path}")

# Export risk metrics to CSV
csv_path = f"{output_dir}/risk_metrics.csv"
risk_metrics.coalesce(1).write.mode("overwrite").option("header", "true").csv(csv_path)
print(f"✅ Risk metrics exported to: {csv_path}")

# Export golden cross signals to JSON
json_path = f"{output_dir}/golden_cross_signals.json"
golden_crosses.coalesce(1).write.mode("overwrite").json(json_path)
print(f"✅ Trading signals exported to: {json_path}")

print(f"\n📁 All files saved in: {os.path.abspath(output_dir)}")

## Summary and Next Steps

This notebook demonstrated the key features of the Alpaca PySpark Connector:

### ✅ What we accomplished:
1. **Connected to Alpaca API** using secure credential management
2. **Fetched historical data** for multiple symbols using distributed processing
3. **Performed technical analysis** with PySpark's window functions
4. **Created visualizations** showing price trends, volume, and volatility
5. **Identified trading signals** using moving average crossovers
6. **Analyzed performance** with different configuration settings
7. **Exported results** to multiple formats (Parquet, CSV, JSON)

### 🔧 Key Features:
- **Distributed Loading**: Automatic parallelization across date ranges and symbols
- **Flexible Configuration**: Customizable page sizes, retry logic, and chunking strategies  
- **Error Handling**: Robust retry mechanisms and rate limit handling
- **Multiple Formats**: Support for various timeframes (1Min, 5Min, 1Day, etc.)
- **Scalability**: Efficient processing of large datasets using Spark

### 🚀 Next Steps:
1. **Scale up**: Try with more symbols and longer date ranges
2. **Real-time integration**: Combine with streaming data sources
3. **Advanced analytics**: Implement more sophisticated trading strategies
4. **Model training**: Use the data for machine learning models
5. **Production deployment**: Set up automated data pipelines

### 📚 Resources:
- [Alpaca API Documentation](https://docs.alpaca.markets/)
- [PySpark SQL Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- [Connector Source Code](../src/alpaca_connector.py)
- [Test Suite](../tests/test_alpaca_connector.py)

In [None]:
# Clean up Spark session
print("🧹 Cleaning up Spark session...")
spark.stop()
print("✅ Spark session stopped successfully")