# Data Preprocessing

In [None]:
import warnings

warnings.filterwarnings("ignore")

In [None]:
from pathlib import Path

raw_data_folder: Path = Path("../00_raw/")
draft_data_processing_folder: Path = Path("../10_draft_processing/")
aggregare_draft_data_folder: Path = Path("../20_aggregate_draft/")
prod_data_folder: Path = Path("30_prod/")
features_data_folder: Path = Path("40_features/")

In [2]:
TICKER_LIST_FILE: str = "ticker_list.csv"

In [None]:
import pandas as pd

start_date: str = "2010-01-01"
end_date: str = "2025-01-01"
expected_weeks: pd.DatetimeIndex = pd.date_range(
    start=start_date, end=end_date, freq="W"
)

In [None]:
target_count = 3000

In [None]:
def safe_submit(executor, fn, *args, **kwargs):
    """
    Attempts to submit a task to the executor.
    If a RuntimeError with "can't start new thread" occurs, waits 0.5 seconds and retries.
    """
    while True:
        try:
            return executor.submit(fn, *args, **kwargs)
        except RuntimeError as e:
            if "can't start new thread" in str(e):
                print(
                    "Thread creation error encountered. Waiting before retrying submission."
                )
                time.sleep(0.5)
                continue
            else:
                raise

In [None]:
def process_ticker(ticker):
    """
    Downloads weekly data for a ticker from yfinance over the specified date range,
    checks if it has data from January 2010 and is not missing a majority of weeks.

    Returns a tuple (ticker, series, status) where:
      - series is a new pd.Series with the weekly Close prices (aligned to expected_weeks)
        if the ticker is acceptable; otherwise None.
      - status is "good" if accepted or a string explaining the rejection.
    """
    # (Optional) Sleep a little to help stagger downloads.
    time.sleep(random.uniform(0.05, 0.2))

    # Use a lock to ensure only one thread calls yf.download at a time.
    with download_lock:
        try:
            data = yf.download(
                ticker,
                start=START_DATE,
                end=END_DATE,
                interval="1wk",
                progress=False,
                auto_adjust=True,
                threads=False,
            )
        except Exception as e:
            return (ticker, None, f"download_error: {e}")

    if data.empty:
        return (ticker, None, "no_data")

    first_date = data.index.min().to_pydatetime().date()
    if first_date > datetime.strptime(START_DATE, "%Y-%m-%d").date():
        return (ticker, None, "insufficient_history")

    if len(data) < 0.5 * len(expected_weeks):
        return (ticker, None, "insufficient_data_points")

    # Extract the "Close" column.
    series = data["Close"]
    if isinstance(series, pd.DataFrame):
        series = series.iloc[:, 0]

    # Reindex the series to expected weeks using forward fill.
    series_aligned = series.reindex(expected_weeks, method="ffill")
    # Create a new Series so that it is independent.
    series_new = pd.Series(
        data=series_aligned.values, index=series_aligned.index, name=ticker
    )
    return (ticker, series_new, "good")

In [None]:
def load_previous_state():
    """
    Loads the combined backup data along with successful and unsuccessful ticker lists.
    Returns:
      combined_df: DataFrame with saved ticker data (indexed by expected_weeks).
      good_ticker_data: dict mapping ticker to its pd.Series.
      processed_tickers: dict mapping ticker to its status ("good" or error string).
      unsuccessful_set: set of tickers known to be unsuccessful.
      saved_count: number of tickers already saved (from backup files).
    """
    combined_df = pd.DataFrame(index=expected_weeks)
    good_ticker_data = {}
    processed_tickers = {}

    # Load successful tickers.
    try:
        success_df = pd.read_csv(SUCCESSFUL_TICKERS_FILE)
        successful_tickers = success_df["Ticker"].dropna().tolist()
    except Exception:
        successful_tickers = []
    # Load unsuccessful tickers.
    try:
        unsuccess_df = pd.read_csv(UNSUCCESSFUL_TICKERS_FILE)
        unsuccessful_tickers = unsuccess_df["Ticker"].dropna().tolist()
    except Exception:
        unsuccessful_tickers = []

    for ticker in successful_tickers:
        processed_tickers[ticker] = "good"
    for ticker in unsuccessful_tickers:
        processed_tickers[ticker] = "failed"

    # Create a set for quick lookup of unsuccessful tickers.
    unsuccessful_set = set(unsuccessful_tickers)

    # Check for backup files. We assume backups are named like "ticker_data_backup_{n}.csv"
    backup_files = glob.glob(
        os.path.join(DATA_BACKUP_FOLDER, "ticker_data_backup_*.csv")
    )
    saved_count = 0
    if backup_files:

        def extract_count(fn):
            base = os.path.basename(fn)
            try:
                return int(base.replace("ticker_data_backup_", "").replace(".csv", ""))
            except:
                return 0

        backup_files.sort(key=extract_count)
        latest_backup = backup_files[-1]
        try:
            combined_df = pd.read_csv(latest_backup, index_col=0, parse_dates=True)
            for ticker in combined_df.columns:
                good_ticker_data[ticker] = combined_df[ticker]
            saved_count = len(combined_df.columns)
            print(f"Loaded backup from {latest_backup} with {saved_count} tickers.")
        except Exception as e:
            print(f"Error loading backup file {latest_backup}: {e}")
    else:
        print("No previous backup file found.")

    return (
        combined_df,
        good_ticker_data,
        processed_tickers,
        unsuccessful_set,
        saved_count,
    )

In [None]:
tickers_df = pd.read_csv(TICKER_LIST_FILE)
all_tickers = tickers_df["Ticker"].dropna().unique().tolist()

# Load previously saved state.
(combined_df, good_ticker_data, processed_tickers, unsuccessful_set, saved_count) = (
    load_previous_state()
)
print(
    f"Starting with {len(good_ticker_data)} good tickers and {len(processed_tickers)} total processed tickers."
)

In [None]:
try:
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            while len(good_ticker_data) < TARGET_COUNT and len(processed_tickers) < len(all_tickers):
                # Only consider tickers not already processed.
                remaining = list(set(all_tickers) - set(processed_tickers.keys()))
                if not remaining:
                    print("No more tickers remaining to sample.")
                    break

                batch_size = min(20, len(remaining))
                batch = random.sample(remaining, batch_size)
                future_to_ticker = {}

                # Mark tickers as scheduled immediately to avoid duplicates in future batches.
                for ticker in batch:
                    processed_tickers[ticker] = "scheduled"
                    future = safe_submit(executor, process_ticker, ticker)
                    future_to_ticker[future] = ticker

                for future in as_completed(future_to_ticker):
                    ticker = future_to_ticker[future]
                    try:
                        ticker_symbol, series, status = future.result()
                    except Exception as exc:
                        processed_tickers[ticker] = f"exception: {exc}"
                        if ticker not in unsuccessful_set:
                            pd.DataFrame([[ticker, f"exception: {exc}"]],
                                         columns=["Ticker", "Status"]) \
                              .to_csv(UNSUCCESSFUL_TICKERS_FILE, mode="a", header=False, index=False)
                            unsuccessful_set.add(ticker)
                        continue

                    processed_tickers[ticker_symbol] = status

                    if status == "good" and series is not None:
                        combined_df[ticker_symbol] = series.copy()
                        good_ticker_data[ticker_symbol] = series.copy()
                        print(f"Ticker {ticker_symbol} accepted. Total good tickers: {len(good_ticker_data)}")
                        pd.DataFrame([[ticker_symbol]], columns=["Ticker"]).to_csv(
                            SUCCESSFUL_TICKERS_FILE, mode="a", header=False, index=False)
                    else:
                        if ticker_symbol not in unsuccessful_set:
                            pd.DataFrame([[ticker_symbol, status]], columns=["Ticker", "Status"]) \
                              .to_csv(UNSUCCESSFUL_TICKERS_FILE, mode="a", header=False, index=False)
                            unsuccessful_set.add(ticker_symbol)

                # Short pause between batches to allow thread cleanup.
                time.sleep(0.1)

                if (len(good_ticker_data) - saved_count) >= BACKUP_INTERVAL:
                    backup_csv = os.path.join(DATA_BACKUP_FOLDER, f"ticker_data_backup_{len(good_ticker_data)}.csv")
                    combined_df.to_csv(backup_csv)
                    print(f"Backup saved after reaching {len(good_ticker_data)} good tickers.")
                    saved_count = len(good_ticker_data)
    except KeyboardInterrupt:
        print("Keyboard interrupt received. Shutting down threads gracefully.")
    finally:
        final_data_csv = os.path.join(DATA_BACKUP_FOLDER, "ticker_data_final.csv")
        final_processed_csv = os.path.join(TICKER_LIST_FOLDER, "processed_tickers_final.csv")
        combined_df.to_csv(final_data_csv)
        pd.DataFrame(list(processed_tickers.items()), columns=["Ticker", "Status"]).to_csv(final_processed_csv, index=False)
        print(f"Process complete. {len(good_ticker_data)} good tickers saved.")