In [8]:
# Import required libraries
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, date
from pathlib import Path
import logging
from tqdm import tqdm
import warnings
import pytz

# Configure logging
log_file = f'../reports/data_quality_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_file),
        logging.StreamHandler()
    ]
)

# Create necessary directories
for dir_path in ['../data/stocks', '../data/processed', '../reports']:
    Path(dir_path).mkdir(parents=True, exist_ok=True)

# Define parameters
START_DATE = '2015-01-01'
END_DATE = date.today().strftime('%Y-%m-%d')
INTERVAL = '1d'
MIN_HISTORY_DAYS = 1000
REQUIRED_COLUMNS = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits']

# Suppress warnings
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)

# Set timezone for consistency
UTC = pytz.UTC

logging.info(f"Data collection configured for period: {START_DATE} to {END_DATE}")
print(f"Logs will be saved to: {log_file}")


2025-07-26 07:01:15,648 - INFO - Data collection configured for period: 2015-01-01 to 2025-07-26


Logs will be saved to: ../reports/data_quality_20250726_070115.log


In [9]:
# Define data quality functions
def detect_outliers(df, columns=['Open', 'High', 'Low', 'Close'], threshold=3.0):  # Increased threshold
    """
    Detect outliers using IQR method with a more lenient threshold.
    Returns dictionary with outlier counts per column.
    """
    outliers = {}
    for col in columns:
        # Calculate rolling median and IQR to handle trends
        rolling_median = df[col].rolling(window=20, center=True).median()
        Q1 = df[col].rolling(window=20, center=True).quantile(0.25)
        Q3 = df[col].rolling(window=20, center=True).quantile(0.75)
        IQR = Q3 - Q1
        
        # Handle zero/small IQR cases
        IQR = np.where(IQR < 1e-10, df[col].std(), IQR)
        
        lower = rolling_median - threshold * IQR
        upper = rolling_median + threshold * IQR
        
        outliers[col] = len(df[(df[col] < lower) | (df[col] > upper)])
    return outliers

def validate_price_data(df):
    """
    Validate price data for common issues.
    Returns list of validation issues.
    """
    issues = []
    
    # Check for negative prices
    if (df[['Open', 'High', 'Low', 'Close']] < 0).any().any():  # Changed from <= 0 to < 0
        issues.append("Found negative prices")
    
    # Check price relationships with tolerance
    tolerance = df['Close'].rolling(window=20).std() * 0.01  # 1% of rolling std as tolerance
    tolerance = tolerance.fillna(df['Close'].std() * 0.01)
    
    invalid_prices = (
        (df['High'] < df['Low'] - tolerance) |
        (df['Open'] > df['High'] + tolerance) |
        (df['Open'] < df['Low'] - tolerance) |
        (df['Close'] > df['High'] + tolerance) |
        (df['Close'] < df['Low'] - tolerance)
    )
    if invalid_prices.any():
        issues.append(f"Found {invalid_prices.sum()} invalid price relationships")
    
    # Check for extreme price changes with adaptive threshold
    daily_returns = df['Close'].pct_change()
    rolling_std = daily_returns.rolling(window=20).std()
    threshold = rolling_std * 5  # 5 standard deviations
    threshold = threshold.fillna(daily_returns.std() * 5)
    extreme_returns = (daily_returns.abs() > threshold)
    if extreme_returns.any():
        issues.append(f"Found {extreme_returns.sum()} extreme price changes")
    
    return issues

def handle_outliers(df, columns=['Open', 'High', 'Low', 'Close']):
    """
    Handle outliers using Winsorization with adaptive thresholds.
    Returns a new DataFrame with outliers handled.
    """
    df_clean = df.copy()
    
    for col in columns:
        # Calculate rolling statistics
        rolling_median = df[col].rolling(window=20, center=True).median()
        rolling_std = df[col].rolling(window=20, center=True).std()
        
        # Fill NaN values with overall statistics
        rolling_median = rolling_median.fillna(df[col].median())
        rolling_std = rolling_std.fillna(df[col].std())
        
        # Set threshold as 4 standard deviations
        lower = rolling_median - 4 * rolling_std
        upper = rolling_median + 4 * rolling_std
        
        # Winsorize the data
        df_clean[col] = df[col].clip(lower=lower, upper=upper)
    
    return df_clean

def validate_stock_data(df, ticker, sector):
    """
    Comprehensive data validation for a single stock.
    Returns tuple of (is_valid, issues).
    """
    issues = []
    
    # Check data completeness
    if len(df) < MIN_HISTORY_DAYS:
        issues.append(f"Insufficient history: {len(df)} days < {MIN_HISTORY_DAYS}")
    
    # Check for missing values
    missing_values = df.isnull().sum()
    if missing_values.any():
        issues.append(f"Missing values: {dict(missing_values[missing_values > 0])}")
    
    # Check for future dates
    current_date = pd.Timestamp(END_DATE, tz=UTC)
    if df['Date'].max() > current_date:
        issues.append(f"Found future dates: {df['Date'].max().strftime('%Y-%m-%d')}")
    
    # Detect outliers (only consider it an issue if more than 5% of data points are outliers)
    outliers = detect_outliers(df)
    significant_outliers = {k: v for k, v in outliers.items() if v > len(df) * 0.05}
    if significant_outliers:
        for col, count in significant_outliers.items():
            issues.append(f"{count} potential outliers in {col} prices")
    
    # Validate price data
    price_issues = validate_price_data(df)
    issues.extend(price_issues)
    
    # Only consider validation failed if there are serious issues
    serious_issues = [
        issue for issue in issues 
        if "Missing values" in issue 
        or "Insufficient history" in issue 
        or "future dates" in issue
    ]
    
    return len(serious_issues) == 0, issues


In [10]:
# Function to fetch and clean data for a single ticker
def fetch_stock_data(ticker, sector):
    """
    Fetch and clean stock data for a given ticker.
    
    Args:
        ticker (str): Stock ticker symbol
        sector (str): Stock sector
        
    Returns:
        pd.DataFrame or None: Cleaned stock data if successful, None if failed
    """
    try:
        output_path = f'../data/stocks/{ticker}.csv'
        
        # Check if file already exists
        if Path(output_path).exists():
            df = pd.read_csv(output_path)
            df['Date'] = pd.to_datetime(df['Date'], utc=True)  # Properly handle timezone
            logging.info(f"Loaded existing data for {ticker}")
        else:
            logging.info(f"Downloading data for {ticker} ({sector})")
            # Fetch data
            stock = yf.Ticker(ticker)
            df = stock.history(
                start=START_DATE,
                end=END_DATE,
                interval=INTERVAL
            )
            
            if len(df) == 0:
                logging.error(f"No data downloaded for {ticker}")
                return None
                
            # Reset index to make Date a column
            df = df.reset_index()
            df['Date'] = pd.to_datetime(df['Date'], utc=True)  # Ensure timezone awareness
            
        # Add ticker and sector columns
        df['Ticker'] = ticker
        df['Sector'] = sector
        
        # Verify required columns
        missing_cols = set(REQUIRED_COLUMNS) - set(df.columns)
        if missing_cols:
            logging.warning(f"{ticker}: Missing columns: {missing_cols}")
            return None
            
        # Clean data
        df = df.dropna(subset=['Close', 'Volume'])  # Remove rows with missing critical data
        
        # Validate data
        is_valid, issues = validate_stock_data(df, ticker, sector)
        
        if not is_valid:
            for issue in issues:
                logging.warning(f"{ticker} data quality issues:")
                logging.warning(f"- {issue}")
            
            # Handle outliers if they exist
            if any("outliers" in issue for issue in issues):
                df = handle_outliers(df)
                logging.info(f"Applied outlier handling for {ticker}")
            
            validation_results['failed'].append(ticker)
        else:
            logging.info(f"Validated {ticker} data: {len(df)} rows")
            validation_results['passed'].append(ticker)
            
        # Save processed data
        df.to_csv(output_path, index=False)
        
        return df
        
    except Exception as e:
        logging.error(f"Error processing {ticker}: {str(e)}")
        validation_results['excluded'].append(ticker)
        return None


In [11]:
# Load sector mapping and initialize results tracking
try:
    # Define GICS sectors and their ETFs
    SECTOR_MAP = {
        'XLK': 'Information Technology',
        'XLF': 'Financials',
        'XLV': 'Healthcare',
        'XLE': 'Energy',
        'XLY': 'Consumer Discretionary',
        'XLP': 'Consumer Staples',
        'XLI': 'Industrials',
        'XLB': 'Materials',
        'XLU': 'Utilities',
        'XLRE': 'Real Estate',
        'XLC': 'Communication Services'
    }
    
    # Load and clean sector mapping
    sector_mapping = pd.read_csv('../data/sector_mapping.csv')
    
    # Standardize sector names
    sector_name_map = {
        'Technology': 'Information Technology',
        'Technology ': 'Information Technology',
        'Consumer_Discretionary': 'Consumer Discretionary',
        'Consumer_Staples': 'Consumer Staples',
        'Communication_Services': 'Communication Services'
    }
    
    # Clean sector names
    sector_mapping['Sector'] = sector_mapping['Sector'].str.strip()
    sector_mapping['Sector'] = sector_mapping['Sector'].replace(sector_name_map)
    
    # Get unique tickers
    tickers = sector_mapping['Ticker'].unique()
    logging.info(f"Loaded {len(tickers)} tickers from sector mapping")
    
    print("\nSector distribution:")
    sector_counts = sector_mapping['Sector'].value_counts()
    for sector, count in sector_counts.items():
        print(f"- {sector}: {count} stocks")
        
except Exception as e:
    logging.error(f"Error loading sector mapping: {str(e)}")
    raise Exception("Cannot proceed without sector mapping")

# Dictionary to store validation results
validation_results = {
    'passed': [],
    'failed': [],
    'excluded': []
}

# Dictionary to store processed data
stock_data = {}

2025-07-26 07:01:15,780 - INFO - Loaded 109 tickers from sector mapping



Sector distribution:
- Information Technology: 52 stocks
- Consumer Discretionary: 14 stocks
- Healthcare: 11 stocks
- Industrials: 7 stocks
- Consumer Staples: 6 stocks
- Communication Services: 6 stocks
- Utilities: 3 stocks
- Materials: 3 stocks
- Real Estate: 3 stocks
- Financials: 2 stocks
- Energy: 2 stocks


In [12]:
# Process each ticker
for ticker in tqdm(tickers, desc="Processing stocks"):
    try:
        # Get sector for the ticker
        sector = sector_mapping[sector_mapping['Ticker'] == ticker]['Sector'].iloc[0]
        
        # Fetch and process data
        df = fetch_stock_data(ticker, sector)
        
        if df is not None:
            stock_data[ticker] = df
            
    except Exception as e:
        logging.error(f"Error processing {ticker}: {str(e)}")
        validation_results['excluded'].append(ticker)
        continue

# Print summary
print("\nValidation Summary:")
print(f"- Passed: {len(validation_results['passed'])} stocks")
print(f"- Failed: {len(validation_results['failed'])} stocks")
print(f"- Excluded: {len(validation_results['excluded'])} stocks")

if validation_results['failed']:
    print("\nFailed validations:")
    for ticker in validation_results['failed']:
        sector = sector_mapping[sector_mapping['Ticker'] == ticker]['Sector'].iloc[0]
        print(f"- {ticker} ({sector})")

if validation_results['excluded']:
    print("\nExcluded stocks:")
    for ticker in validation_results['excluded']:
        sector = sector_mapping[sector_mapping['Ticker'] == ticker]['Sector'].iloc[0]
        print(f"- {ticker} ({sector})")

print(f"\nDetailed validation report saved to: {log_file}")


Processing stocks:   0%|          | 0/109 [00:00<?, ?it/s]2025-07-26 07:01:15,843 - INFO - Loaded existing data for AAPL
2025-07-26 07:01:15,888 - INFO - Validated AAPL data: 2651 rows
Processing stocks:   1%|          | 1/109 [00:00<00:12,  8.55it/s]2025-07-26 07:01:15,944 - INFO - Loaded existing data for MSFT
2025-07-26 07:01:15,973 - INFO - Validated MSFT data: 2651 rows
2025-07-26 07:01:16,013 - INFO - Loaded existing data for AMZN
2025-07-26 07:01:16,038 - INFO - Validated AMZN data: 2651 rows
Processing stocks:   3%|▎         | 3/109 [00:00<00:08, 12.17it/s]2025-07-26 07:01:16,078 - INFO - Loaded existing data for NVDA
2025-07-26 07:01:16,103 - INFO - Validated NVDA data: 2651 rows
2025-07-26 07:01:16,141 - INFO - Loaded existing data for META
2025-07-26 07:01:16,169 - INFO - Validated META data: 2651 rows
Processing stocks:   5%|▍         | 5/109 [00:00<00:08, 12.26it/s]2025-07-26 07:01:16,265 - INFO - Loaded existing data for GOOGL
2025-07-26 07:01:16,312 - INFO - Validated GO


Validation Summary:
- Passed: 106 stocks
- Failed: 2 stocks
- Excluded: 0 stocks

Failed validations:
- GFS (Information Technology)
- RIVN (Consumer Discretionary)

Detailed validation report saved to: ../reports/data_quality_20250726_070115.log





In [13]:
# Generate data quality metrics
def analyze_data_quality():
    """
    Generate comprehensive data quality metrics.
    """
    if not stock_data:
        logging.error("No data available for quality analysis")
        return pd.DataFrame()
        
    quality_stats = []
    
    for ticker, df in stock_data.items():
        try:
            # Calculate basic statistics
            stats = {
                'Ticker': ticker,
                'Sector': df['Sector'].iloc[0],
                'Total_Days': len(df),
                'Start_Date': df['Date'].min().strftime('%Y-%m-%d'),
                'End_Date': df['Date'].max().strftime('%Y-%m-%d'),
                'Missing_Values': df.isnull().sum().sum(),
                'Trading_Days_Per_Year': len(df) / ((df['Date'].max() - df['Date'].min()).days / 365)
            }
            
            # Calculate price statistics
            for col in ['Open', 'High', 'Low', 'Close']:
                stats[f'{col}_Mean'] = df[col].mean()
                stats[f'{col}_Std'] = df[col].std()
                stats[f'{col}_Min'] = df[col].min()
                stats[f'{col}_Max'] = df[col].max()
            
            quality_stats.append(stats)
            
        except Exception as e:
            logging.error(f"Error calculating metrics for {ticker}: {str(e)}")
            continue
    
    if not quality_stats:
        logging.error("No quality statistics generated")
        return pd.DataFrame()
    
    quality_df = pd.DataFrame(quality_stats)
    
    try:
        # Save quality metrics
        quality_df.to_csv('../data/processed/data_quality_metrics.csv', index=False)
        
        # Print summary statistics
        print("\nData Quality Metrics:")
        print(f"Average trading days: {quality_df['Total_Days'].mean():.1f}")
        print(f"Date range: {quality_df['Start_Date'].min()} to {quality_df['End_Date'].max()}")
        
        # Analyze by sector
        print("\nSector-wise Statistics:")
        sector_stats = quality_df.groupby('Sector').agg({
            'Total_Days': ['mean', 'min', 'max'],
            'Missing_Values': 'sum'
        }).round(2)
        
        print(sector_stats)
        
    except Exception as e:
        logging.error(f"Error generating quality metrics summary: {str(e)}")
    
    return quality_df

# Generate quality metrics
quality_metrics = analyze_data_quality()

if not quality_metrics.empty:
    print("\nData quality analysis complete!")
    print("Detailed metrics saved to: ../data/processed/data_quality_metrics.csv")
else:
    print("\nNo quality metrics generated due to data processing errors.")



Data Quality Metrics:
Average trading days: 2463.2
Date range: 2015-01-02 to 2025-07-25

Sector-wise Statistics:
                       Total_Days             Missing_Values
                             mean   min   max            sum
Sector                                                      
Communication Services    2651.00  2651  2651              0
Consumer Discretionary    2277.07   929  2651              0
Consumer Staples          2630.00  2525  2651              0
Energy                    2651.00  2651  2651              0
Financials                1800.50  1076  2525              0
Healthcare                2561.00  1661  2651              0
Industrials               2651.00  2651  2651              0
Information Technology    2410.67   938  2656              0
Materials                 2656.00  2656  2656              0
Real Estate               2656.00  2656  2656              0
Utilities                 2651.00  2651  2651              0

Data quality analysis complete!

In [14]:
# Save merged dataset
if stock_data:
    try:
        # Concatenate all validated dataframes
        merged_df = pd.concat(stock_data.values(), axis=0, ignore_index=True)
        
        # Sort by date, sector, and ticker
        merged_df = merged_df.sort_values(['Date', 'Sector', 'Ticker'])
        
        # Save merged dataset
        output_path = '../data/processed/nasdaq_stocks_merged.csv'
        merged_df.to_csv(output_path, index=False)
        logging.info(f"Saved merged dataset to {output_path}")
        
        # Generate summary report
        with open('../reports/data_collection_summary.txt', 'w') as f:
            f.write("NASDAQ-100 Data Collection Summary\n")
            f.write("=" * 50 + "\n\n")
            
            f.write("Data Coverage:\n")
            f.write(f"- Start Date: {merged_df['Date'].min().strftime('%Y-%m-%d')}\n")
            f.write(f"- End Date: {merged_df['Date'].max().strftime('%Y-%m-%d')}\n")
            f.write(f"- Total Trading Days: {len(merged_df['Date'].unique()):,}\n\n")
            
            f.write("Stock Coverage:\n")
            f.write(f"- Total Stocks: {len(stock_data)}\n")
            f.write(f"- Passed Validation: {len(validation_results['passed'])}\n")
            f.write(f"- Failed Validation: {len(validation_results['failed'])}\n")
            f.write(f"- Excluded: {len(validation_results['excluded'])}\n\n")
            
            f.write("Sector Coverage:\n")
            for sector, count in sector_counts.items():
                f.write(f"- {sector}: {count} stocks\n")
            
        print("\nData collection and validation complete!")
        print(f"- Merged dataset saved to: {output_path}")
        print("- Summary report saved to: ../reports/data_collection_summary.txt")
        print(f"- Detailed log saved to: {log_file}")
        
    except Exception as e:
        logging.error(f"Error saving merged dataset: {str(e)}")
else:
    logging.error("No validated data available to merge")


2025-07-26 07:01:42,043 - INFO - Saved merged dataset to ../data/processed/nasdaq_stocks_merged.csv



Data collection and validation complete!
- Merged dataset saved to: ../data/processed/nasdaq_stocks_merged.csv
- Summary report saved to: ../reports/data_collection_summary.txt
- Detailed log saved to: ../reports/data_quality_20250726_070115.log
