## Yearly S&P 500 Holdings and saving output into csv file

In [3]:
import pandas as pd
import datetime as dt
from typing import Dict, List
import logging
from pathlib import Path

def setup_logging():
    """Configure basic logging for the script."""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )

def parse_dates(date_str):
    """
    Parse dates in multiple formats.
    
    Args:
        date_str: Date string to parse
    Returns:
        Parsed datetime object
    """
    try:
        # Try different date formats
        for fmt in [
            "%B %d, %Y",    # September 23, 2019
            "%Y-%m-%d",     # 1957-03-04
            "%m/%d/%Y"      # 03/04/1957
        ]:
            try:
                return pd.to_datetime(date_str, format=fmt)
            except ValueError:
                continue
                
        # If none of the specific formats work, try the default parser
        return pd.to_datetime(date_str)
        
    except Exception as e:
        logging.warning(f"Could not parse date: {date_str}. Error: {str(e)}")
        return None

def get_sp500_history() -> pd.DataFrame:
    """
    Fetch and process S&P 500 historical data from Wikipedia.
    Returns processed historical data DataFrame.
    """
    url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
    data = pd.read_html(url)
    
    # Process current companies
    current = data[0].iloc[:, [0, 1, 5, 6]].copy()
    current.columns = ['ticker', 'name', 'date', 'cik']
    current['cik'] = current['cik'].astype(str).str.zfill(10)
    current['date'] = current['date'].apply(parse_dates)
    
    # Process historical changes
    adjustments = data[1].copy()
    adjustments.columns = ['date', 'ticker_added', 'name_added', 
                         'ticker_removed', 'name_removed', 'reason']
    adjustments['date'] = adjustments['date'].apply(parse_dates)
    
    # Process additions and removals
    additions = (adjustments[~adjustments['ticker_added'].isnull()]
                [['date', 'ticker_added', 'name_added']]
                .rename(columns={'ticker_added': 'ticker', 
                               'name_added': 'name'}))
    additions['action'] = 'added'
    
    removals = (adjustments[~adjustments['ticker_removed'].isnull()]
                [['date', 'ticker_removed', 'name_removed']]
                .rename(columns={'ticker_removed': 'ticker', 
                               'name_removed': 'name'}))
    removals['action'] = 'removed'
    
    # Combine historical changes
    historical = pd.concat([additions, removals])
    
    # Add missing companies
    missing = current[~current['ticker'].isin(historical['ticker'])].copy()
    missing['action'] = 'added'
    missing = missing[['date', 'ticker', 'name', 'action', 'cik']]
    
    # Create complete history
    complete_history = pd.concat([historical, missing])
    complete_history = complete_history.sort_values(['date', 'ticker'])
    
    # Remove any rows where date parsing failed
    complete_history = complete_history.dropna(subset=['date'])
    
    return complete_history, current

def create_yearly_holdings(history: pd.DataFrame, current: pd.DataFrame) -> Dict[int, pd.DataFrame]:
    """
    Create yearly snapshots of S&P 500 holdings from 2008 to present.
    
    Args:
        history: DataFrame with historical changes
        current: DataFrame with current companies
    Returns:
        Dictionary with year as key and holdings DataFrame as value
    """
    # Initialize with current holdings
    holdings: Dict[int, set] = {}
    current_year = dt.datetime.now().year
    
    # Create set of current tickers
    current_tickers = set(current['ticker'])
    
    # Initialize all years from 2008 to present with current holdings
    for year in range(2008, current_year + 1):
        holdings[year] = current_tickers.copy()
    
    # Process historical changes backwards
    for _, row in history.sort_values('date', ascending=False).iterrows():
        year = row['date'].year
        if year >= 2008:
            if row['action'] == 'added':
                # Before addition, company wasn't in index
                for y in range(2008, row['date'].year + 1):
                    holdings[y].discard(row['ticker'])
            else:  # removal
                # Before removal, company was in index
                for y in range(2008, row['date'].year + 1):
                    holdings[y].add(row['ticker'])

    # Convert sets to DataFrames with company info
    yearly_holdings = {}
    company_info = pd.concat([
        current[['ticker', 'name', 'cik']],
        history[['ticker', 'name']].drop_duplicates()
    ]).drop_duplicates('ticker')
    
    for year in holdings:
        year_df = (
            pd.DataFrame({'ticker': list(holdings[year])})\
            .merge(company_info, on='ticker', how='left')
        )
        year_df['year'] = year
        year_df = year_df[['year', 'ticker', 'name', 'cik']]
        yearly_holdings[year] = year_df
    
    return yearly_holdings

def save_yearly_holdings(holdings: Dict[int, pd.DataFrame], output_dir: str = 'sp500_holdings'):
    """
    Save yearly holdings to CSV files.
    
    Args:
        holdings: Dictionary with year as key and holdings DataFrame as value
        output_dir: Directory to save the files
    """
    # Create output directory if it doesn't exist
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    # Save individual year files
    for year, df in holdings.items():
        filename = f"{output_dir}/sp500_holdings_{year}.csv"
        df.to_csv(filename, index=False)
        logging.info(f"Saved holdings for {year} to {filename}")
    
    # Create combined file
    combined = pd.concat(holdings.values())
    combined.to_csv(f"{output_dir}/sp500_holdings_all_years.csv", index=False)
    logging.info("Saved combined holdings file")

def main():
    """Main function to orchestrate the S&P 500 holdings generation."""
    setup_logging()
    logging.info("Starting S&P 500 holdings generation")
    
    try:
        # Get historical data
        history, current = get_sp500_history()
        logging.info("Retrieved historical data")
        
        # Create yearly holdings
        yearly_holdings = create_yearly_holdings(history, current)
        logging.info(f"Created holdings for {len(yearly_holdings)} years")
        
        # Save results
        save_yearly_holdings(yearly_holdings)
        logging.info("Processing completed successfully")
        
        return yearly_holdings
        
    except Exception as e:
        logging.error(f"Error in main processing: {str(e)}")
        raise

if __name__ == "__main__":
    holdings = main()

2025-01-03 17:41:47,693 - INFO - Starting S&P 500 holdings generation
2025-01-03 17:41:48,171 - INFO - Retrieved historical data
2025-01-03 17:41:48,291 - INFO - Created holdings for 18 years
2025-01-03 17:41:48,298 - INFO - Saved holdings for 2008 to sp500_holdings/sp500_holdings_2008.csv
2025-01-03 17:41:48,302 - INFO - Saved holdings for 2009 to sp500_holdings/sp500_holdings_2009.csv
2025-01-03 17:41:48,306 - INFO - Saved holdings for 2010 to sp500_holdings/sp500_holdings_2010.csv
2025-01-03 17:41:48,311 - INFO - Saved holdings for 2011 to sp500_holdings/sp500_holdings_2011.csv
2025-01-03 17:41:48,314 - INFO - Saved holdings for 2012 to sp500_holdings/sp500_holdings_2012.csv
2025-01-03 17:41:48,317 - INFO - Saved holdings for 2013 to sp500_holdings/sp500_holdings_2013.csv
2025-01-03 17:41:48,319 - INFO - Saved holdings for 2014 to sp500_holdings/sp500_holdings_2014.csv
2025-01-03 17:41:48,322 - INFO - Saved holdings for 2015 to sp500_holdings/sp500_holdings_2015.csv
2025-01-03 17:41

## Yearly S&P 500 Holdings and Storing into database using sqlite 

In [4]:
import sqlite3
from typing import Dict
import pandas as pd
import logging
from pathlib import Path

def setup_database(db_path: str = 'sp500_holdings.db') -> sqlite3.Connection:
    """
    Create SQLite database and required tables.
    
    Args:
        db_path: Path to the SQLite database file
    Returns:
        Database connection object
    """
    conn = sqlite3.connect(db_path)
    
    # Create tables
    with conn:
        # Companies table to store unique company information
        conn.execute('''
        CREATE TABLE IF NOT EXISTS companies (
            ticker TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            cik TEXT
        )
        ''')
        
        # Holdings table to store yearly holdings with foreign key to companies
        conn.execute('''
        CREATE TABLE IF NOT EXISTS holdings (
            year INTEGER,
            ticker TEXT,
            PRIMARY KEY (year, ticker),
            FOREIGN KEY (ticker) REFERENCES companies(ticker)
        )
        ''')
        
        # Historical changes table to track additions and removals
        conn.execute('''
        CREATE TABLE IF NOT EXISTS historical_changes (
            date DATE NOT NULL,
            ticker TEXT NOT NULL,
            name TEXT NOT NULL,
            action TEXT CHECK(action IN ('added', 'removed')),
            PRIMARY KEY (date, ticker, action)
        )
        ''')
    
    return conn

def save_to_database(
    yearly_holdings: Dict[int, pd.DataFrame],
    history: pd.DataFrame,
    db_path: str = 'sp500_holdings.db'
) -> None:
    """
    Save S&P 500 holdings data to SQLite database.
    
    Args:
        yearly_holdings: Dictionary with year as key and holdings DataFrame as value
        history: DataFrame with historical changes
        db_path: Path to the SQLite database file
    """
    conn = setup_database(db_path)
    
    try:
        with conn:
            # Extract unique company information
            companies = pd.concat([
                df[['ticker', 'name', 'cik']] for df in yearly_holdings.values()
            ]).drop_duplicates('ticker')
            
            # Save companies
            companies.to_sql('companies', conn, if_exists='replace', index=False)
            logging.info(f"Saved {len(companies)} companies to database")
            
            # Save holdings for each year
            holdings_data = []
            for year, df in yearly_holdings.items():
                for ticker in df['ticker']:
                    holdings_data.append({'year': year, 'ticker': ticker})
            
            holdings_df = pd.DataFrame(holdings_data)
            holdings_df.to_sql('holdings', conn, if_exists='replace', index=False)
            logging.info(f"Saved holdings for {len(yearly_holdings)} years to database")
            
            # Save historical changes
            history_df = history[['date', 'ticker', 'name', 'action']].copy()
            history_df.to_sql('historical_changes', conn, if_exists='replace', index=False)
            logging.info(f"Saved {len(history_df)} historical changes to database")
            
    except Exception as e:
        logging.error(f"Error saving to database: {str(e)}")
        raise
    finally:
        conn.close()

def query_holdings_for_year(year: int, db_path: str = 'sp500_holdings.db') -> pd.DataFrame:
    """
    Query holdings for a specific year.
    
    Args:
        year: Year to query
        db_path: Path to the SQLite database file
    Returns:
        DataFrame with holdings for the specified year
    """
    conn = sqlite3.connect(db_path)
    
    try:
        query = '''
        SELECT h.year, h.ticker, c.name, c.cik
        FROM holdings h
        JOIN companies c ON h.ticker = c.ticker
        WHERE h.year = ?
        ORDER BY h.ticker
        '''
        
        return pd.read_sql_query(query, conn, params=[year])
    finally:
        conn.close()

def query_company_history(ticker: str, db_path: str = 'sp500_holdings.db') -> pd.DataFrame:
    """
    Query historical changes for a specific company.
    
    Args:
        ticker: Company ticker symbol
        db_path: Path to the SQLite database file
    Returns:
        DataFrame with historical changes for the company
    """
    conn = sqlite3.connect(db_path)
    
    try:
        query = '''
        SELECT date, ticker, name, action
        FROM historical_changes
        WHERE ticker = ?
        ORDER BY date
        '''
        
        return pd.read_sql_query(query, conn, params=[ticker])
    finally:
        conn.close()

# Modify the main function to include database operations
def main():
    """Main function to generate S&P 500 holdings and save to database."""
    setup_logging()
    logging.info("Starting S&P 500 holdings generation and database creation")
    
    try:
        # Get historical data
        history, current = get_sp500_history()
        logging.info("Retrieved historical data")
        
        # Create yearly holdings
        yearly_holdings = create_yearly_holdings(history, current)
        logging.info(f"Created holdings for {len(yearly_holdings)} years")
        
        # Save to database
        save_to_database(yearly_holdings, history)
        logging.info("Saved data to database successfully")
        
        return yearly_holdings
        
    except Exception as e:
        logging.error(f"Error in main processing: {str(e)}")
        raise

if __name__ == "__main__":
    holdings = main()

2025-01-04 10:19:25,425 - INFO - Starting S&P 500 holdings generation and database creation
2025-01-04 10:19:26,099 - INFO - Retrieved historical data
2025-01-04 10:19:26,207 - INFO - Created holdings for 18 years
2025-01-04 10:19:26,333 - INFO - Saved 794 companies to database
2025-01-04 10:19:26,365 - INFO - Saved holdings for 18 years to database
2025-01-04 10:19:26,375 - INFO - Saved 964 historical changes to database
2025-01-04 10:19:26,377 - INFO - Saved data to database successfully


## Print Yearly Output from databse

In [9]:
import pandas as pd
import sqlite3

# Connect and query directly to DataFrame
conn = sqlite3.connect('sp500_holdings.db')

# Get all companies currently in the index
query = """
SELECT DISTINCT c.*
FROM companies c
JOIN holdings h ON c.ticker = h.ticker
WHERE h.year = 2024
"""
current_companies = pd.read_sql_query(query, conn)
print("\nCurrent Companies in S&P 500:")
print(current_companies)  # Display the current companies DataFrame

# Get historical changes for a specific year
query = """
SELECT *
FROM historical_changes
WHERE strftime('%Y', date) = '2023'
ORDER BY date
"""
changes_2023 = pd.read_sql_query(query, conn)
print("\nChanges made in 2023:")
print(changes_2023)  # Display the changes DataFrame

# You can also see the first few rows using head()
print("\nFirst 5 current companies:")
print(current_companies.head())

# And check the shape (number of rows and columns) of your DataFrames
print("\nDataFrame shapes:")
print(f"Current companies: {current_companies.shape}")
print(f"2023 changes: {changes_2023.shape}")

conn.close()


Current Companies in S&P 500:
    ticker                        name         cik
0      AXP            American Express  0000004962
1      HCA              HCA Healthcare  0000860730
2      CRL  Charles River Laboratories  0001100682
3      FCX            Freeport-McMoRan  0000831259
4      AIZ                    Assurant  0001267238
..     ...                         ...         ...
497    CPB       Campbell Soup Company  0000016732
498    ESS        Essex Property Trust  0000920522
499    SRE                      Sempra  0001032208
500    DAL             Delta Air Lines  0000027904
501   APTV                       Aptiv  0001521332

[502 rows x 3 columns]

Changes made in 2023:
                   date ticker                          name   action
0   2023-01-04 00:00:00   GEHC                 GE HealthCare    added
1   2023-01-05 00:00:00    VNO          Vornado Realty Trust  removed
2   2023-03-15 00:00:00     BG                  Bunge Global    added
3   2023-03-15 00:00:00   PODD