# 4.5 Factor Analysis on Financial and Economic Time Series

Factor Analysis and Principal Component Analysis on Financial and Economic Time Series

In [None]:
# If you're running this on Colab, make sure to install the following packages using pip.
# On you're own computer, I recommend using conda or mamba.

# !pip install pandas-datareader
# !pip install yfinance

# !conda install pandas-datareader
# !conda install yfinance

In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

import yfinance as yf
import pandas_datareader as pdr
import sklearn.decomposition
import statsmodels.multivariate.pca

In [None]:
import config

DATA_DIR = config.DATA_DIR

## Downloading macroeconomic and financial data from FRED

In [None]:
fred_series_long_names = {
    "BAMLH0A0HYM2": "ICE BofA US High Yield Index Option-Adjusted Spread",
    "NASDAQCOM": "NASDAQ Composite Index",
    "RIFSPPFAAD90NB": "90-Day AA Financial Commercial Paper Interest Rate",
    "TB3MS": "3-Month Treasury Bill Secondary Market Rate",
    "DGS10": "Market Yield on U.S. Treasury Securities at 10-Year Constant Maturity",
    "VIXCLS": "CBOE Volatility Index: VIX",
}

In [None]:
fred_series_short_names = {
    "BAMLH0A0HYM2": "High Yield Index OAS",
    "NASDAQCOM": "NASDAQ",
    "RIFSPPFAAD90NB": "90-Day AA Fin CP",
    "TB3MS": "3-Month T-Bill",
    "DGS10": "10-Year Treasury",
    "VIXCLS": "VIX",
}

In [None]:
start_date = pd.to_datetime("1980-01-01")
end_date = pd.to_datetime("today")

In [None]:
df = pdr.get_data_fred(fred_series_short_names.keys(), start=start_date, end=end_date)

First, an aside about reading and writing data to disk.

In [None]:
df.to_csv(DATA_DIR / "fred_panel.csv")

In [None]:
dff = pd.read_csv(DATA_DIR / "fred_panel.csv")

In [None]:
dff.info()

In [None]:
dff = pd.read_csv(DATA_DIR / "fred_panel.csv", parse_dates=["DATE"])

In [None]:
dff.info()

In [None]:
dff = dff.set_index("DATE")

In [None]:
df.to_parquet(DATA_DIR / "fred_panel.parquet")

In [None]:
df = pd.read_parquet(DATA_DIR / "fred_panel.parquet")

In [None]:
df.info()

In [None]:
df

## Cleaning Data


In [None]:
df = dff.rename(columns=fred_series_short_names)
df

Balanced panel? Mixed frequencies?

In [None]:
df["3-Month T-Bill"].dropna()

Find a daily version of this series. See here: https://fred.stlouisfed.org/categories/22

We will end up using this series: https://fred.stlouisfed.org/series/DTB3

In [None]:
fred_series_short_names = {
    "BAMLH0A0HYM2": "High Yield Index OAS",
    "NASDAQCOM": "NASDAQ",
    "RIFSPPFAAD90NB": "90-Day AA Fin CP",
    "DTB3": "3-Month T-Bill",
    "DGS10": "10-Year Treasury",
    "VIXCLS": "VIX",
}
df = pdr.get_data_fred(fred_series_short_names.keys(), start=start_date, end=end_date)
df = df.rename(columns=fred_series_short_names)

In [None]:
df

In [None]:
df.dropna()

## Transforming and Normalizing the data

What is transformation and normalization? Are these different things?

 - Why would one transform data? What is feature engineering?
 - What is normalization?

What does stationarity mean? See the the following plots. Some of these variable are stationary. Other are not? Why is this a problem?

In [None]:
df.plot()

In [None]:
df.info()

In [None]:
df.drop(columns=["NASDAQ"]).plot()

Let's try some transformations like those used in the OFR Financial Stress Index: https://www.financialresearch.gov/financial-stress-index/files/indicators/index.html

In [None]:
dfn = pd.DataFrame().reindex_like(df)
dfn

In [None]:
df["NASDAQ"].rolling(250).mean()

In [None]:
df = df.dropna()

In [None]:
df["NASDAQ"].rolling(250).mean()

In [None]:
# 'High Yield Index OAS': Leave as is
dfn["High Yield Index OAS"] = df["High Yield Index OAS"]
dfn["CP - Treasury Spread, 3m"] = df["90-Day AA Fin CP"] - df["3-Month T-Bill"]
# 'NASDAQ':  # We're using something different, but still apply rolling mean transformation
dfn["NASDAQ"] = np.log(df["NASDAQ"]) - np.log(df["NASDAQ"].rolling(250).mean())
dfn["10-Year Treasury"] = (
    df["10-Year Treasury"] - df["10-Year Treasury"].rolling(250).mean()
)
# 'VIX': Leave as is
dfn["VIX"] = df["VIX"]

In [None]:
dfn = dfn.drop(columns=["90-Day AA Fin CP", "3-Month T-Bill"])
dfn = dfn.dropna()

In [None]:
dfn.info()

We finished with our transformations. Now, let's normalize. First, why is it important?

In [None]:
dfn.plot()

Now, normalize each column,
$$
z = \frac{x - \bar x}{\text{std}(x)}
$$

In [None]:
dfn = (dfn - dfn.mean()) / dfn.std()

In [None]:
dfn.plot()

In [None]:
def pca(dfn, module="scikitlearn"):
    if module == "statsmodels":
        _pc1, _loadings, projection, rsquare, _, _, _ = (
            statsmodels.multivariate.pca.pca(
                dfn,
                ncomp=1,
                standardize=True,
                demean=True,
                normalize=True,
                gls=False,
                weights=None,
                method="svd",
            )
        )
        _loadings = _loadings["comp_0"]
        loadings = np.std(_pc1) * _loadings
        pc1 = _pc1 / np.std(_pc1)
        pc1 = pc1.rename(columns={"comp_0": "PC1"})["PC1"]

    elif module == "scikitlearn":
        pca = sklearn.decomposition.PCA(n_components=1)
        _pc1 = pd.Series(pca.fit_transform(dfn)[:, 0], index=dfn.index, name="PC1")
        _loadings = pca.components_.T * np.sqrt(pca.explained_variance_)
        _loadings = pd.Series(_loadings[:, 0], index=dfn.columns)

        loadings = np.std(_pc1) * _loadings
        pc1 = _pc1 / np.std(_pc1)
        pc1.name = "PC1"
    else:
        raise ValueError

    loadings.name = "loadings"

    return pc1, loadings


def stacked_plot(df, filename=None):
    """
    df=category_contributions
    # category_contributions.sum(axis=1).plot()
    """

    df_pos = df[df >= 0]
    df_neg = df[df < 0]

    alpha = 0.3
    linewidth = 0.5

    ax = df_pos.plot.area(alpha=alpha, linewidth=linewidth, legend=False)
    pc1 = df.sum(axis=1)
    pc1.name = "pc1"
    pc1.plot(color="Black", label="pc1", linewidth=1)

    plt.legend()
    ax.set_prop_cycle(None)
    df_neg.plot.area(
        alpha=alpha, ax=ax, linewidth=linewidth, legend=False, ylim=(-3, 3)
    )
    # recompute the ax.dataLim
    ax.relim()
    # update ax.viewLim using the new dataLim
    ax.autoscale()
    # ax.set_ylabel('Standard Deviations')
    # ax.set_ylim(-3,4)
    # ax.set_ylim(-30,30)

    if filename is not None:
        filename = Path(filename)
        figure = plt.gcf()  # get current figure
        figure.set_size_inches(8, 6)
        plt.savefig(filename, dpi=300)

In [None]:
pc1, loadings = pca(dfn, module="scikitlearn")

In [None]:
pc1.plot()

In [None]:
stacked_plot(dfn)

Let's compare solutions from two different packages

In [None]:
def root_mean_squared_error(sa, sb):
    return np.sqrt(np.mean((sa - sb) ** 2))


pc1_sk, loadings_sk = pca(dfn, module="scikitlearn")
pc1_sm, loadings_sm = pca(dfn, module="statsmodels")
root_mean_squared_error(pc1_sm, pc1_sk)

## Factor Analysis of a Panel of Stock Returns?

In [None]:
# Download sample data for multiple tickers
# Note: yfinance may return different structures depending on version and number of tickers
sample = yf.download(
    "SPY AAPL MSFT", start="2017-01-01", end="2017-04-30", progress=False
)

In [None]:
# Let's examine the structure of the downloaded data
print(
    "Sample columns:",
    sample.columns.tolist() if hasattr(sample, "columns") else "No columns attribute",
)
print(
    "Sample shape:", sample.shape if hasattr(sample, "shape") else "No shape attribute"
)
if hasattr(sample, "columns") and isinstance(sample.columns, pd.MultiIndex):
    print("Column levels:", sample.columns.levels)
    print("First level values:", sample.columns.levels[0].tolist())

In [None]:
sample

In [None]:
# When downloading multiple tickers, yfinance returns a DataFrame with MultiIndex columns
# The first level is the data type (e.g., 'Adj Close'), the second level is the ticker
# Display the adjusted close prices for all tickers
adj_close_data = (
    sample["Adj Close"] if "Adj Close" in sample.columns.get_level_values(0) else sample
)
adj_close_data

In [None]:
tickers = [
    "AAPL",
    "ABBV",
    "ABT",
    "ACN",
    "ADP",
    "ADSK",
    "AES",
    "AET",
    "AFL",
    "AMAT",
    "AMGN",
    "AMZN",
    "APA",
    "APHA",
    "APD",
    "APTV",
    "ARE",
    "ASML",
    "ATVI",
    "AXP",
    "BA",
    "BAC",
    "BAX",
    "BDX",
    "BIIB",
    "BK",
    "BKNG",
    "BMY",
    "BRKB",
    "BRK.A",
    "COG",
    "COST",
    "CPB",
    "CRM",
    "CSCO",
    "CVS",
    "DAL",
    "DD",
    "DHR",
    "DIS",
    "DOW",
    "DUK",
    "EMR",
    "EPD",
    "EQT",
    "ESRT",
    "EXPD",
    "FFIV",
    "FLS",
    "FLT",
    "FRT",
    "GE",
    "GILD",
    "GOOGL",
    "GOOG",
    "GS",
    "HAL",
    "HD",
    "HON",
    "IBM",
    "INTC",
    "IP",
    "JNJ",
    "JPM",
    "KEY",
    "KHC",
    "KIM",
    "KO",
    "LLY",
    "LMT",
    "LOW",
    "MCD",
    "MCHP",
    "MDT",
    "MMM",
    "MO",
    "MRK",
    "MSFT",
    "MTD",
    "NEE",
    "NFLX",
    "NKE",
    "NOV",
    "ORCL",
    "OXY",
    "PEP",
    "PFE",
    "PG",
    "RTN",
    "RTX",
    "SBUX",
    "SHW",
    "SLB",
    "SO",
    "SPG",
    "STT",
    "T",
    "TGT",
    "TXN",
    "UNH",
    "UPS",
    "USB",
    "UTX",
    "V",
    "VZ",
    "WMT",
    "XOM",
]

In [None]:
all_tickers = " ".join(tickers)
all_tickers

In [None]:
data = yf.download(
    all_tickers, start="1980-01-01", end=pd.to_datetime("today"), progress=False
)

In [None]:
data

In [None]:
data["Close"]["AAPL"].plot()

In [None]:
cols_with_many_nas = [
    "BRK.A",
    "APHA",
    "UTX",
    "RTN",
    "COG",
    "BRKB",
    "ATVI",
    "FLT",
    "DOW",
    "KHC",
    "V",
    "APTV",
    "ABBV",
    "ESRT",
]
df = data["Close"]
print(f"Initial shape: {df.shape}")
df = df.drop(columns=cols_with_many_nas, errors="ignore")
print(f"After dropping columns: {df.shape}")
df = df.dropna()
print(f"After first dropna: {df.shape}")
df = df.pct_change()
print(f"After pct_change: {df.shape}")
df = df.dropna()
print(f"Final shape: {df.shape}")

# If DataFrame is empty, use a smaller date range or fewer tickers
if df.empty:
    print("DataFrame is empty! Trying with fewer tickers and recent data...")
    simple_tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
    data = yf.download(
        " ".join(simple_tickers),
        start="2020-01-01",
        end=pd.to_datetime("today"),
        progress=False,
    )
    df = data["Close"].pct_change().dropna()
    print(f"New shape with simple tickers: {df.shape}")

In [None]:
df.columns

In [None]:
df["AAPL"].plot()

In [None]:
if not df.empty:
    pc1, loadings = pca(df, module="scikitlearn")
    print(f"PCA completed successfully. PC1 shape: {pc1.shape}")
else:
    print("Cannot run PCA on empty DataFrame!")
    pc1 = pd.Series(dtype=float)
    loadings = pd.Series(dtype=float)

In [None]:
if not pc1.empty:
    pc1.plot()
else:
    print("No data to plot for PC1")

In [None]:
if not pc1.empty:
    pc1.cumsum().plot()
else:
    print("No data to plot for cumulative PC1")