In [10]:
import alpaca_trade_api as tradeapi
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.ensemble import RandomForestRegressor
import warnings
import time

warnings.filterwarnings("ignore")  # To suppress warnings for cleaner output

class StockAnalyzer:
    def __init__(self, alpaca_api_key, alpaca_secret_key, base_url='https://paper-api.alpaca.markets'):
        # Initialize Alpaca API
        self.api = tradeapi.REST(alpaca_api_key, alpaca_secret_key, base_url, api_version='v2')
        self.test_mode = False

    def get_all_symbols(self):
        # Fetch all active and tradable assets
        assets = self.api.list_assets(status='active')
        symbols = [
            asset.symbol for asset in assets
            if asset.tradable and asset.exchange in ['NYSE', 'NASDAQ']
        ]
        print(f"Total tradable symbols found: {len(symbols)}")
        return symbols

    def fetch_historical_data(self, symbols, start_date, end_date, time_frame, feed='iex', batch_size=100, aggregate_to_hour=False):
        # Adjust date formats to match API requirements
        start = start_date.strftime('%Y-%m-%dT%H:%M:%SZ')
        end = end_date.strftime('%Y-%m-%dT%H:%M:%SZ')

        all_data = pd.DataFrame()

        # Batch symbols to avoid exceeding rate limits
        for i in range(0, len(symbols), batch_size):
            batch_symbols = symbols[i:i + batch_size]
            success = False
            attempts = 0
            max_attempts = 5
            while not success and attempts < max_attempts:
                try:
                    bars = self.api.get_bars(
                        batch_symbols,
                        time_frame,
                        start=start,
                        end=end,
                        adjustment='raw',
                        feed=feed
                    ).df
                    if not bars.empty:
                        all_data = pd.concat([all_data, bars])
                        print(f"Fetched data for batch {i // batch_size + 1} with {len(batch_symbols)} symbols.")
                    else:
                        print(f"No data returned for batch {i // batch_size + 1} with symbols {batch_symbols[0]} to {batch_symbols[-1]}")
                    time.sleep(1)  # Pause to respect rate limits
                    success = True
                except tradeapi.rest.APIError as e:
                    if 'rate limit' in str(e).lower():
                        attempts += 1
                        wait_time = 3 * attempts
                        print(f"Rate limit exceeded. Sleeping for {wait_time} seconds and retrying (Attempt {attempts}/{max_attempts})...")
                        time.sleep(wait_time)
                    else:
                        print(f"Error fetching data for symbols {batch_symbols[0]} to {batch_symbols[-1]}: {e}")
                        break
                except Exception as e:
                    print(f"Unexpected error: {e}")
                    break
            if not success:
                print(f"Failed to fetch data for batch {i // batch_size + 1} after {max_attempts} attempts.")
            else:
                # Sleep between batches to respect rate limits
                time.sleep(2)  # Adjust sleep time as needed
        all_data.reset_index(inplace=True)

        if aggregate_to_hour and time_frame == tradeapi.TimeFrame.Minute:
            # Aggregate minute data to hourly data
            all_data['timestamp'] = all_data['timestamp'].dt.floor('H')
            all_data = all_data.groupby(['symbol', 'timestamp']).agg({
                'open': 'first',
                'high': 'max',
                'low': 'min',
                'close': 'last',
                'volume': 'sum',
                'trade_count': 'sum',
                'vwap': 'mean'
            }).reset_index()
        return all_data

    def prepare_features_for_horizon(self, data, horizon, time_frame):
        data = data.copy()
        # Sort data and reset index
        data = data.sort_values(['symbol', 'timestamp']).reset_index(drop=True)
        # Filter symbols with data less than data_threshold
        data_counts = data.groupby('symbol').size()
        valid_symbols = data_counts[data_counts >= self.data_threshold].index.tolist()
        data = data[data['symbol'].isin(valid_symbols)]
        # Compute features grouped by symbol
        data['return'] = data.groupby('symbol')['close'].pct_change()
        data['volatility'] = data.groupby('symbol')['close'].transform(lambda x: x.rolling(window=5).std())
        data['momentum'] = data.groupby('symbol')['close'].transform(lambda x: x / x.shift(5) - 1)
        data['ma5'] = data.groupby('symbol')['close'].transform(lambda x: x.rolling(window=5).mean())
        data['ma10'] = data.groupby('symbol')['close'].transform(lambda x: x.rolling(window=10).mean())
        data['ma_ratio'] = data['ma5'] / data['ma10']
        # Shift 'close' to create the target variable based on the horizon
        data['target'] = data.groupby('symbol')['close'].shift(-horizon)
        # Drop rows with any NaN values in the required columns
        data = data.dropna(subset=['return', 'volatility', 'momentum', 'ma_ratio', 'target'])
        return data

    def train_and_predict_for_horizon(self, data, horizon_name):
        if data.empty:
            print(f"No data available after feature preparation for {horizon_name}.")
            return {}
        # Separate the latest features per symbol for prediction
        latest_data = data.groupby('symbol').tail(1)
        # Remove the last data point per symbol from training data
        training_data = data.groupby('symbol').apply(lambda x: x.iloc[:-1]).reset_index(drop=True)
        # Ensure we have enough data to train
        if training_data.empty:
            print(f"No training data available for {horizon_name}.")
            return {}
        # Training features and target
        X_train = training_data[['return', 'volatility', 'momentum', 'ma_ratio']]
        y_train = training_data['target']
        # Remove infinite or NaN values
        X_train = X_train.replace([np.inf, -np.inf], np.nan).dropna()
        y_train = y_train.loc[X_train.index]
        # Train the model on combined data
        print(f"Training the model on combined data for {horizon_name}...")
        model = RandomForestRegressor(n_estimators=50, random_state=42, n_jobs=-1)
        model.fit(X_train, y_train)
        # Prepare test data (latest features)
        X_test = latest_data[['return', 'volatility', 'momentum', 'ma_ratio']]
        X_test = X_test.replace([np.inf, -np.inf], np.nan).dropna()
        symbols = X_test.index.get_level_values('symbol')
        current_prices = latest_data.loc[symbols]['close'].values
        # Predict expected prices
        predicted_prices = model.predict(X_test)
        # Calculate expected returns
        expected_returns = {}
        for i, symbol in enumerate(symbols):
            predicted_price = predicted_prices[i]
            current_price = current_prices[i]
            expected_return = (predicted_price - current_price) / current_price * 100
            expected_returns[symbol] = expected_return
        return expected_returns

    def run(self, test_mode=False, batch_size=100, data_threshold=60, top_stocks=10):
        self.test_mode = test_mode
        self.data_threshold = data_threshold  # Store data_threshold as an instance variable

        # Define time horizons and corresponding parameters
        horizons = {
            '1d': {'horizon': 1, 'start_days_ago': 60},
            '5d': {'horizon': 5, 'start_days_ago': 300},
            '1y': {'horizon': 252, 'start_days_ago': 700},
        }

        # Get all symbols
        symbols = self.get_all_symbols()

        # Limit symbols to avoid rate limits if necessary
        # symbols = symbols[:1000]  # Adjust as needed

        # Determine the earliest start_date needed among all horizons
        earliest_start_days_ago = max(params['start_days_ago'] for params in horizons.values())
        start_date = datetime.now() - timedelta(days=earliest_start_days_ago)
        end_date = datetime.now() - timedelta(days=1)

        # Fetch historical data for all symbols once
        print("Fetching historical data once for all batches and horizons...")
        data = self.fetch_historical_data(
            symbols,
            start_date,
            end_date,
            tradeapi.TimeFrame.Day,  # Assuming daily data for all horizons
            batch_size=batch_size,
            aggregate_to_hour=False
        )

        if data.empty:
            print("No historical data fetched.")
            return

        # Initialize a dictionary to hold expected returns for each horizon
        horizon_returns = {}

        for horizon_name, params in horizons.items():
            print(f"\nProcessing horizon: {horizon_name}")

            # Prepare features and target for this horizon
            data_horizon = data.copy()
            # Filter data for the required date range per horizon
            horizon_start_date = datetime.now() - timedelta(days=params['start_days_ago'])
            data_horizon = data_horizon[data_horizon['timestamp'] >= np.datetime64(horizon_start_date)]
            data_horizon = self.prepare_features_for_horizon(data_horizon, params['horizon'], tradeapi.TimeFrame.Day)

            if data_horizon.empty:
                print(f"No data available after feature preparation for {horizon_name}.")
                continue

            # Train model and predict expected returns
            expected_returns = self.train_and_predict_for_horizon(data_horizon, horizon_name)

            if not expected_returns:
                print(f"No expected returns calculated for {horizon_name}.")
                continue

            # Store the expected returns
            horizon_returns[horizon_name] = expected_returns

        # Combine results and rank stocks
        combined_returns = {}
        for symbol in symbols:
            symbol_returns = {horizon: horizon_returns.get(horizon, {}).get(symbol, None) for horizon in horizons if horizon in horizon_returns}
            if all(value is not None for value in symbol_returns.values()):
                combined_returns[symbol] = symbol_returns

        # Sort stocks based on average expected return across horizons
        ranked_stocks = sorted(
            combined_returns.items(),
            key=lambda x: sum(x[1].values()) / len(x[1]),
            reverse=True
        )

        print(f"\nTop {top_stocks} Stocks ranked by average expected return:")
        for symbol, returns in ranked_stocks[:top_stocks]:
            print(f"{symbol}: ", end="")
            for horizon in horizons:
                if horizon in returns:
                    print(f"{horizon}: {returns[horizon]:.2f}% ", end="")
                else:
                    print(f"{horizon}: N/A ", end="")
            print()

        return ranked_stocks
    
# Usage Example:
if __name__ == '__main__':
    alpaca_api_key = 'PKJATKYKR4NWCYALKL58' 
    alpaca_secret_key = '8aFRppO0HdabLAO8b05ZImUwSd0bEmv4hqpU0Hqs'

    # Initialize the analyzer
    analyzer = StockAnalyzer(alpaca_api_key, alpaca_secret_key)

    # Run in test mode to prevent actual trades
    ranked_stocks = analyzer.run(test_mode=True, batch_size=1000, data_threshold=30, top_stocks=100)
    

Total tradable symbols found: 7648
Fetching historical data once for all batches and horizons...


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 se

Fetched data for batch 1 with 1000 symbols.


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 se

Fetched data for batch 2 with 1000 symbols.


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 se

Fetched data for batch 3 with 1000 symbols.


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 se

Fetched data for batch 4 with 1000 symbols.


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 se

Fetched data for batch 5 with 1000 symbols.


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 se

Fetched data for batch 6 with 1000 symbols.


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 se

Fetched data for batch 7 with 1000 symbols.


sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 seconds and retrying https://data.alpaca.markets/v2/stocks/bars 3 more time(s)...
sleep 3 se

Fetched data for batch 8 with 648 symbols.

Processing horizon: 1d


TypeError: Invalid comparison between dtype=datetime64[ns, UTC] and datetime64