In [1]:
import requests
import pandas as pd
import time

In [2]:
API_KEY ='P3GEXXI9LB6K6GFQ'

In [3]:
def test_single_stock(symbol):
    url = f'https://www.alphavantage.co/query'
    params = {
        'function': 'TIME_SERIES_DAILY',
        'symbol': symbol,
        'apikey': API_KEY,
        'outputsize': 'compact' # last 100 days
    }

    response = requests.get(url, params=params)
    data = response.json()

    print(f'Response keys for {symbol}:', list(data.keys()))
    return data

In [4]:
test_data = test_single_stock('AAPL')

Response keys for AAPL: ['Meta Data', 'Time Series (Daily)']


In [5]:
test_data

{'Meta Data': {'1. Information': 'Daily Prices (open, high, low, close) and Volumes',
  '2. Symbol': 'AAPL',
  '3. Last Refreshed': '2025-09-02',
  '4. Output Size': 'Compact',
  '5. Time Zone': 'US/Eastern'},
 'Time Series (Daily)': {'2025-09-02': {'1. open': '229.4900',
   '2. high': '230.7800',
   '3. low': '226.9700',
   '4. close': '229.7200',
   '5. volume': '43657351'},
  '2025-08-29': {'1. open': '232.5100',
   '2. high': '233.3800',
   '3. low': '231.3700',
   '4. close': '232.1400',
   '5. volume': '39418437'},
  '2025-08-28': {'1. open': '230.8200',
   '2. high': '233.4100',
   '3. low': '229.3350',
   '4. close': '232.5600',
   '5. volume': '38074700'},
  '2025-08-27': {'1. open': '228.6100',
   '2. high': '230.9000',
   '3. low': '228.2600',
   '4. close': '230.4900',
   '5. volume': '31259513'},
  '2025-08-26': {'1. open': '226.8700',
   '2. high': '229.4900',
   '3. low': '224.6900',
   '4. close': '229.3100',
   '5. volume': '54575107'},
  '2025-08-25': {'1. open': '226

In [9]:
import numpy as np
from datetime import datetime, timedelta
import os
from typing import Optional, Tuple, Union

In [16]:
class DataManager:
    def __init__(self, api_key: str, cache_dir: str = "./data_cache"):
        self.api_key = api_key
        self.cache_dir = cache_dir
        
        # Create cache directory if it doesn't exist
        if not os.path.exists(cache_dir):
            os.makedirs(cache_dir)
    
    def _normalize_date_input(self, date_input: Union[str, datetime, pd.Timestamp]) -> pd.Timestamp:
        """Convert various date inputs to pandas Timestamp"""
        if isinstance(date_input, str):
            if date_input.upper() == 'TODAY':
                # Get last business day (accounts for weekends automatically)
                return pd.Timestamp.today().normalize()
            else:
                return pd.to_datetime(date_input)
        elif isinstance(date_input, datetime):
            return pd.Timestamp(date_input)
        elif isinstance(date_input, pd.Timestamp):
            return date_input
        else:
            raise ValueError(f"Unsupported date format: {type(date_input)}")
    
    def _get_cache_file_path(self, symbol: str) -> str:
        """Get the file path for cached data"""
        return os.path.join(self.cache_dir, f"{symbol.upper()}_daily.csv")
    
    def _load_from_cache(self, symbol: str) -> Optional[pd.DataFrame]:
        """Load cached data for a symbol"""
        cache_file = self._get_cache_file_path(symbol)
        
        if not os.path.exists(cache_file):
            return None
        
        try:
            df = pd.read_csv(cache_file, index_col='Date', parse_dates=True)
            return df
        except Exception as e:
            print(f"Error loading cache for {symbol}: {e}")
            return None

    
    
    def _get_cache_date_range(self, cached_data: pd.DataFrame) -> Tuple[pd.Timestamp, pd.Timestamp]:
        """Get the date range of cached data"""
        if cached_data is None or cached_data.empty:
            return None, None
        
        return cached_data.index.min(), cached_data.index.max()
    
    def _needs_refresh(self, symbol: str, requested_start: Union[str, datetime, pd.Timestamp], 
                      requested_end: Union[str, datetime, pd.Timestamp]) -> bool:
        """
        Your core logic - determine if we need to refresh cached data
        
        Returns:
            True if refresh needed, False if cache is sufficient
        """
        # Normalize dates
        req_start = self._normalize_date_input(requested_start)
        req_end = self._normalize_date_input(requested_end)
        
        # Ensure we're dealing with business days only
        req_start = pd.bdate_range(start=req_start, periods=1)[0]
        req_end = pd.bdate_range(end=req_end, periods=1)[0]
        
        # Load cached data
        cached_data = self._load_from_cache(symbol)
        if cached_data is None:
            print(f"No cached data for {symbol} - refresh needed")
            return True
        
        cached_start, cached_end = self._get_cache_date_range(cached_data)
        print(f"Cache range: {cached_start} to {cached_end}")
        print(f"Request range: {req_start} to {req_end}")
        
        # Calculate request period length
        request_period_days = (req_end - req_start).days
        
        # Case 1: Request is fully within cached range
        if cached_start <= req_start and req_end <= cached_end:
            # For short-term requests (< 5 days), always refresh to get latest data
            if request_period_days < 5:
                print(f"Short-term request ({request_period_days} days) - refreshing for latest data")
                return True
            else:
                print("Request fully covered by cache - using cached data")
                return False
        
        # Case 2: Partial overlap scenarios
        start_within = cached_start <= req_start <= cached_end
        end_within = cached_start <= req_end <= cached_end
        
        if start_within and not end_within:
            # Need data beyond cached end date
            gap_days = (req_end - cached_end).days
            if gap_days > 30:  # More than a month gap
                print(f"Large gap beyond cache ({gap_days} days) - refresh needed")
                return True
            else:
                print(f"Small gap beyond cache ({gap_days} days) - refresh for extension")
                return True
                
        elif end_within and not start_within:
            # Need data before cached start date
            gap_days = (cached_start - req_start).days
            if gap_days > 30:
                print(f"Large gap before cache ({gap_days} days) - refresh needed")
                return True
            else:
                print(f"Small gap before cache ({gap_days} days) - refresh for extension")
                return True
        
        # Case 3: No overlap - completely outside cached range
        start_gap = min(abs((cached_start - req_start).days), abs((cached_end - req_start).days))
        end_gap = min(abs((cached_start - req_end).days), abs((cached_end - req_end).days))
        
        # If gaps are small (within a month), might be worth refreshing to get continuous data
        if start_gap < 30 and end_gap < 30:
            print("Request close to cached range - refresh for continuous data")
            return True
        else:
            print("Request far from cached range - refresh needed")
            return True

    def _fetch_from_api(self, symbol: str, outputsize: str = 'full') -> pd.DataFrame:
        """
        Fetch data from Alpha Vantage API and return clean DataFrame
        """
        print(f"Fetching {symbol} from API...")
        time.sleep(1)  # Rate limiting
        
        symbol = symbol.upper()
        
        url = 'https://www.alphavantage.co/query'
        params = {
            'function': 'TIME_SERIES_DAILY',
            'symbol': symbol,
            'apikey': self.api_key,
            'outputsize': outputsize
        }
        
        try:
            response = requests.get(url, params=params)
            data = response.json()
            
            # Check for API errors
            if 'Error Message' in data:
                raise ValueError(f"API Error: {data['Error Message']}")
            
            if 'Note' in data:
                raise ValueError(f"API Rate Limit: {data['Note']}")
            
            # Extract time series data
            time_series_key = 'Time Series (Daily)'
            if time_series_key not in data:
                available_keys = list(data.keys())
                raise ValueError(f"Expected key '{time_series_key}' not found. Available keys: {available_keys}")
            
            time_series_data = data[time_series_key]
            
            # Convert to DataFrame
            df = pd.DataFrame.from_dict(time_series_data, orient='index')
            
            # Clean up column names (remove numbers and periods)
            df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
            
            # Convert to proper data types
            for col in ['Open', 'High', 'Low', 'Close']:
                df[col] = pd.to_numeric(df[col], errors='coerce')
            df['Volume'] = pd.to_numeric(df['Volume'], errors='coerce')
            
            # Convert index to datetime and sort
            df.index = pd.to_datetime(df.index)
            df.sort_index(inplace=True)
            
            print(f"Successfully fetched {len(df)} days of data for {symbol}")
            return df
            
        except requests.exceptions.RequestException as e:
            print(f"Network error fetching {symbol}: {e}")
            raise
        except Exception as e:
            print(f"Error processing {symbol} data: {e}")
            raise
    
    def _save_to_cache(self, symbol: str, df: pd.DataFrame):
        """Save DataFrame to CSV cache"""
        cache_file = self._get_cache_file_path(symbol)
        try:
            df.to_csv(cache_file, index_label='Date')
            print(f"Cached {len(df)} days of data for {symbol}")
        except Exception as e:
            print(f"Error saving cache for {symbol}: {e}")
    
    def get_stock_data(self, symbol: str, start_date: Union[str, datetime, pd.Timestamp] = None,
                      end_date: Union[str, datetime, pd.Timestamp] = "TODAY", 
                      force_refresh: bool = False) -> pd.DataFrame:
        """
        Main method to get stock data - handles caching logic automatically
        
        Args:
            symbol: Stock ticker (e.g., 'AAPL')
            start_date: Start date for data (default: 2 years ago)
            end_date: End date for data (default: today)
            force_refresh: If True, always fetch from API regardless of cache
            
        Returns:
            DataFrame with OHLCV data
        """
        symbol = symbol.upper()
        
        # Set default start date if not provided
        if start_date is None:
            start_date = pd.Timestamp.today() - pd.DateOffset(years=2)
        
        # Normalize dates
        start_date = self._normalize_date_input(start_date)
        end_date = self._normalize_date_input(end_date)
        
        print(f"\n=== Getting {symbol} data from {start_date.date()} to {end_date.date()} ===")
        
        # Check if refresh is needed (unless forced)
        if not force_refresh and not self._needs_refresh(symbol, start_date, end_date):
            print("Using cached data")
            cached_data = self._load_from_cache(symbol)
            # Filter to requested date range
            mask = (cached_data.index >= start_date) & (cached_data.index <= end_date)
            return cached_data.loc[mask]
        
        # Fetch fresh data from API
        try:
            # For requests within last 100 days, use compact to save API calls
            days_requested = (end_date - start_date).days
            outputsize = 'compact' if days_requested <= 100 else 'full'
            
            fresh_data = self._fetch_from_api(symbol, outputsize=outputsize)
            
            # Save to cache
            self._save_to_cache(symbol, fresh_data)
            
            # Filter to requested date range
            mask = (fresh_data.index >= start_date) & (fresh_data.index <= end_date)
            filtered_data = fresh_data.loc[mask]
            
            print(f"Returning {len(filtered_data)} days of data")
            return filtered_data
            
        except Exception as e:
            print(f"API fetch failed: {e}")
            # Try to fall back to cache
            print("Attempting to use cached data as fallback...")
            cached_data = self._load_from_cache(symbol)
            if cached_data is not None:
                print("Using cached data as fallback")
                mask = (cached_data.index >= start_date) & (cached_data.index <= end_date)
                return cached_data.loc[mask]
            else:
                print("No cached data available")
                raise

In [17]:
dm = DataManager('P3GEXXI9LB6K6GFQ')

In [19]:
data = dm.get_stock_data('COIN', start_date='2024-10-01', end_date='TODAY')
print(f"COIN: {len(data)} days, Latest close: ${data['Close'].iloc[-1]:.2f}")


=== Getting COIN data from 2024-10-01 to 2025-09-03 ===
Cache range: 2021-04-14 00:00:00 to 2025-09-02 00:00:00
Request range: 2024-10-01 00:00:00 to 2025-09-03 00:00:00
Small gap beyond cache (1 days) - refresh for extension
Fetching COIN from API...
Successfully fetched 1102 days of data for COIN
Cached 1102 days of data for COIN
Returning 230 days of data
COIN: 230 days, Latest close: $303.56
