In [1]:
import json
import logging
from typing import Optional
import requests
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

In [11]:
class StockTickersDownloader:
    """
    Download currently listed stock tickers on NASDAQ and NYSE
    Data is from 'https://api.nasdaq.com/api/screener/stocks'
    """
    
    def __init__(self, output_dir: str = "active_tickers", user_agent: Optional[str] = None):
        self.output_dir = Path(output_dir)
        
        # Headers configuration 
        self.session = requests.Session()
        self.session.headers.update(
            {
                "User-Agent": user_agent
                or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
                "Accept": "application/json, text/plain, */*",
                "Referer": "https://www.nasdaq.com/",
            }
        )
        #exchanges
        self.exchanges = ["nasdaq", 'nyse']
        
        #logging 
        self._setup_logging()

In [13]:
class StockTickersDownloader(StockTickersDownloader):
    def _setup_logging(self):
        logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
        self.logger = logging.Logger(__name__)
        
    def _create_dirs(self):
        for exchange in self.exchanges + ["all"]:
            (self.output_dir / exchange).mkdir(parents=True, exist_ok=True)
    
    def _fetch_data(self, exchange: str) -> dict:
        try:
            params = {"tableonly": "true", "download": "true", "exchange": exchange}
            response = self.session.get("https://api.nasdaq.com/api/screener/stocks", params=params, timeout=30)
            data = response.json()
            self.logger.info(f"Succesfully fetched {len(data.get('data', {}).get('rows', []))} tickers for {exchange}")
            return data
                             
        except requests.exceptions.RequestException as e:
            self.logger.error(f"Failed to fetch data for {exchange_name}: {e}")
            return {}
                             
        except json.JSONDecodeError as e:
            self.logger.error(f"Invalid JSON response for {exchange_name}: {e}") 
            return {}
        
    

In [14]:
downloader = StockTickersDownloader(output_dir="stock_data_test")

In [17]:
downloader.session.get()

['__attrs__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__setstate__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 'adapters',
 'auth',
 'cert',
 'close',
 'cookies',
 'delete',
 'get',
 'get_adapter',
 'get_redirect_target',
 'head',
 'headers',
 'hooks',
 'max_redirects',
 'merge_environment_settings',
 'mount',
 'options',
 'params',
 'patch',
 'post',
 'prepare_request',
 'proxies',
 'put',
 'rebuild_auth',
 'rebuild_method',
 'rebuild_proxies',
 'request',
 'resolve_redirects',
 'send',
 'should_strip_auth',
 'stream',
 'trust_env',
 'verify']

In [14]:
class StockTickersDownloader:
    """
    Download currently listed stock tickers on NASDAQ and NYSE
    Data is from 'https://api.nasdaq.com/api/screener/stocks'
    """
    
    def _setup_logging(self):
        logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
        self.logger = logging.getLogger(__name__)

In [35]:
class StockTickerDownloader:
    """
    Download stock tickers actively trading on NASDAQ et NYSE.
    Data is from "https://api.nasdaq.com/api/screener/stocks"
    """

    def __init__(
        self, output_dir: str = "active_tickers", user_agent: Optional[str] = None
    ):
        self.output_dir = Path(output_dir)
        self.session = requests.Session()

        # Configuration des headers
        self.session.headers.update(
            {
                "User-Agent": user_agent
                or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
                "Accept": "application/json, text/plain, */*",
                "Referer": "https://www.nasdaq.com/",
            }
        )

        self.exchanges = ["nasdaq", "nyse"]
        self._setup_logging()

In [22]:
StockTickerDownloader

__main__.StockTickerDownloader

In [39]:
downloader = StockTickerDownloader(output_dir="stock_data_test")

In [32]:
downloader.exchanges

['nasdaq', 'nyse']

In [38]:
class StockTickerDownloader(StockTickerDownloader):
    def _setup_logging(self):
        logging.basicConfig(
            level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
        )
        self.logger = logging.getLogger(__name__)

In [None]:
class StockTickerDownloader:
    """
    Download stock tickers actively trading on NASDAQ et NYSE.
    Data is from "https://api.nasdaq.com/api/screener/stocks"
    """

    def _setup_logging(self):
        logging.basicConfig(
            level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
        )
        self.logger = logging.getLogger(__name__)

    def _create_dirs(self):
        for exchange in self.exchanges + ["all"]:
            (self.output_dir / exchange).mkdir(parents=True, exist_ok=True)

    def _fetch_data(self, exchange: str) -> dict:
        """Récupère les données pour un échange."""
        try:
            params = {"tableonly": "true", "download": "true", "exchange": exchange}
            response = self.session.get(
                "https://api.nasdaq.com/api/screener/stocks", params=params, timeout=30
            )
            response.raise_for_status()
            data = response.json()
            self.logger.info(
                f"Fetched {len(data['data']['rows'])} tickers for {exchange}"
            )
            return data
        except Exception as e:
            self.logger.error(f"Erreur pour {exchange}: {e}")
            return {}

    def _save_files(self, exchange: str, data: dict):
        """Sauvegarde les données dans différents formats."""
        exchange_dir = self.output_dir / exchange
        symbols = [
            row["symbol"]
            for row in data.get("data", {}).get("rows", [])
            if row.get("symbol")
        ]

        # Chemins des fichiers
        paths = {
            "full": exchange_dir / f"{exchange}_full_tickers.json",
            "symbols": exchange_dir / f"{exchange}_tickers.json",
            "txt": exchange_dir / f"{exchange}_tickers.txt",
        }

        # Sauvegarde
        paths["full"].write_text(
            json.dumps(data.get("data", {}).get("rows", []), indent=2)
        )
        paths["symbols"].write_text(json.dumps(symbols, indent=2))
        paths["txt"].write_text("\n".join(symbols))

        self.logger.info(f"Sauvegardé {len(symbols)} tickers pour {exchange}")

    def _process_exchange(self, exchange: str) -> bool:
        """Télécharge et sauvegarde les données d'un échange."""
        data = self._fetch_data(exchange)
        if data:
            self._save_files(exchange, data)
            return True
        return False

    def _combine_all(self):
        """Combine tous les tickers uniques."""
        all_symbols = set()
        for exchange in self.exchanges:
            txt_path = self.output_dir / exchange / f"{exchange}_tickers.txt"
            if txt_path.exists():
                all_symbols.update(txt_path.read_text().strip().splitlines())

        (self.output_dir / "all" / "all_tickers.txt").write_text(
            "\n".join(sorted(all_symbols))
        )
        self.logger.info(f"Combined {len(all_symbols)} unique symbols")

    def run(self, max_workers: int = 2) -> bool:
        """Exécute le téléchargement complet."""
        self._create_dirs()

        # Téléchargement parallèle
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            results = list(executor.map(self._process_exchange, self.exchanges))

        success_count = sum(results)
        self.logger.info(f"Réussi: {success_count}/{len(self.exchanges)}")

        if success_count > 0:
            self._combine_all()
            return True
        return False


In [None]:
def main():
    downloader = StockTickerDownloader(output_dir="stock_data")
    if downloader.run(max_workers=2):
        print("Téléchargement terminé avec succès!")
    else:
        print("Erreurs détectées.")
        exit(1)