In [None]:
import pandas as pd
import aiohttp
import asyncio
import logging
from datetime import datetime, timedelta
import pytz
from typing import Dict, List

In [None]:
from dotenv import load_dotenv
load_dotenv()

In [None]:
class PolygonPreMarketScanner:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.polygon.io/v3"
        self.logger = logging.getLogger(__name__)
        self.session = None
    
    async def _init_session(self):
        if not self.session:
            self.session = aiohttp.ClientSession()

    def _get_market_cap_type(self, market_cap: float) -> str:
        """Determine market cap type based on value"""
        if market_cap >= 200e9:
            return 'Mega Cap'
        elif market_cap >= 10e9:
            return 'Large Cap'
        elif market_cap >= 2e9:
            return 'Mid Cap'
        elif market_cap >= 300e6:
            return 'Small Cap'
        else:
            return 'Micro Cap'

    async def _get_volume_metrics(self, ticker: str) -> Dict:
        """Fetch volume metrics for different timeframes"""
        end_date = datetime.now()
        start_date = end_date - timedelta(days=90)
        
        url = f"{self.base_url}/trades/{ticker}/aggregate"
        params = {
            "apiKey": self.api_key,
            "from": start_date.strftime('%Y-%m-%d'),
            "to": end_date.strftime('%Y-%m-%d'),
            "timespan": "day"
        }
        
        try:
            async with self.session.get(url, params=params) as response:
                if response.status == 200:
                    data = await response.json()
                    results = data.get('results', [])
                    
                    if results:
                        volumes = [r['v'] for r in results]
                        return {
                            'avg_vol_10': sum(volumes[-10:]) / min(len(volumes), 10),
                            'avg_vol_30': sum(volumes[-30:]) / min(len(volumes), 30),
                            'avg_vol_90': sum(volumes) / len(volumes),
                            'relative_vol_1d': volumes[-1] / (sum(volumes[-2:]) / 2) if len(volumes) >= 2 else None
                        }
        except Exception as e:
            self.logger.error(f"Error fetching volume metrics for {ticker}: {str(e)}")
        
        return {
            'avg_vol_10': None,
            'avg_vol_30': None,
            'avg_vol_90': None,
            'relative_vol_1d': None
        }

    async def _get_pre_market_data(self, ticker: str) -> Dict:
        """Fetch pre-market data for a single ticker"""
        et_tz = pytz.timezone('US/Eastern')
        now = datetime.now(et_tz)
        today = now.strftime('%Y-%m-%d')
        
        try:
            # Get pre-market trades
            trades_params = {
                "timestamp.gte": f"{today}T04:00:00Z",
                "timestamp.lt": f"{today}T09:30:00Z",
                "limit": 50000,
                "apiKey": self.api_key
            }
            
            async with self.session.get(f"{self.base_url}/trades/{ticker}", params=trades_params) as response:
                if response.status != 200:
                    return None
                    
                trades_data = await response.json()
                if not trades_data.get('results'):
                    return None
                
                # Get previous day's close
                async with self.session.get(f"{self.base_url}/quotes/{ticker}/previous-close", 
                                         params={"apiKey": self.api_key}) as prev_response:
                    if prev_response.status != 200:
                        return None
                        
                    prev_data = await prev_response.json()
                    if not prev_data.get('results'):
                        return None
                    
                    prev_close = prev_data['results'][0]['c']
                    latest_trade = trades_data['results'][0]
                    pre_market_change = latest_trade['p'] - prev_close
                    
                    if pre_market_change <= 0:
                        return None
                    
                    # Get ticker details and volume metrics concurrently
                    async with self.session.get(f"{self.base_url}/reference/tickers/{ticker}", 
                                             params={"apiKey": self.api_key}) as details_response:
                        if details_response.status != 200:
                            return None
                            
                        ticker_details = (await details_response.json()).get('results', {})
                        volume_metrics = await self._get_volume_metrics(ticker)
                        
                        market_cap = ticker_details.get('market_cap', 0)
                        
                        return {
                            'Symbol': ticker,
                            'Description': ticker_details.get('name', ''),
                            'marketCapType': self._get_market_cap_type(market_cap),
                            'Pre-market Change %': round(pre_market_change / prev_close * 100, 2),
                            'Pre-market Gap %': round(pre_market_change / prev_close * 100, 2),
                            'Market capitalization': market_cap,
                            'Price': prev_close,
                            'Pre-market Open': latest_trade['p'],
                            'Industry': ticker_details.get('sic_description', ''),
                            'Index': ticker_details.get('composite_figi', ''),
                            'Sector': ticker_details.get('sector', ''),
                            'Exchange': ticker_details.get('primary_exchange', ''),
                            'Float shares outstanding': ticker_details.get('share_class_shares_outstanding', 0),
                            'Average Volume 10 days': volume_metrics['avg_vol_10'],
                            'Average Volume 30 days': volume_metrics['avg_vol_30'],
                            'Average Volume 90 days': volume_metrics['avg_vol_90'],
                            'Relative Volume 1 day': volume_metrics['relative_vol_1d']
                        }
                        
        except Exception as e:
            self.logger.error(f"Error processing {ticker}: {str(e)}")
            return None

    async def get_pre_market_gainers(self) -> pd.DataFrame:
        """Main method to fetch all pre-market gainers"""
        await self._init_session()
        
        try:
            # Get all tickers
            async with self.session.get(
                f"{self.base_url}/reference/tickers",
                params={
                    "market": "stocks",
                    "active": True,
                    "limit": 1000,
                    "apiKey": self.api_key
                }
            ) as response:
                if response.status != 200:
                    raise Exception("Failed to fetch tickers")
                
                data = await response.json()
                tickers = [t['ticker'] for t in data['results'] 
                          if t['market'] in ['stocks'] and 
                          t['primary_exchange'] in ['XNAS', 'XNYS']]
            
            # Process tickers concurrently with rate limiting
            semaphore = asyncio.Semaphore(5)  # Limit concurrent requests
            async def process_with_semaphore(ticker):
                async with semaphore:
                    return await self._get_pre_market_data(ticker)
            
            tasks = [process_with_semaphore(ticker) for ticker in tickers]
            results = await asyncio.gather(*tasks)
            
            # Filter out None results and create DataFrame
            valid_results = [r for r in results if r is not None]
            df = pd.DataFrame(valid_results)
            
            if not df.empty:
                df = df.sort_values('Pre-market Change %', ascending=False)
            
            return df
            
        finally:
            await self.session.close()

def run_scanner(api_key: str, base_dir_path: str, std_file_name_str: str):
    """Run the scanner and save results"""
    scanner = PolygonPreMarketScanner(api_key)
    df = asyncio.run(scanner.get_pre_market_gainers())
    
    # Save to CSV
    today = datetime.now().strftime('%Y-%m-%d')
    filename = f"{base_dir_path}{std_file_name_str}{today}.csv"
    df.to_csv(filename, index=False)
    
    print(f"Saved {len(df)} pre-market gainers to {filename}")
    print("\nFirst 5 symbols:")
    print(df['Symbol'].head())
    
    return filename, df