In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller

In [None]:
daily_treasury_yield_curve_rate_df = pd.read_csv('daily_treasury_yield_curve_rate.csv',
                                                 parse_dates=['Date'], index_col='Date')
baa10y_df = pd.read_csv('BAA10Y.csv', parse_dates=['DATE'], index_col='DATE')

In [None]:
def prepare_data(df, interest_periods, data_source,
                 start_date='1990-01-01', end_date = '2020-01-01',
                 fill=False, add_freq=False, freq='3m'):
    
    if data_source == 'yield':
        data = df[interest_periods]
    elif data_source == 'baa10y':
        data = df[interest_periods]
        data = data.replace('.',np.nan)
        data = data.dropna()
        data = data.astype(float)
        
    data = data.loc[start_date:end_date]
    data = data.sort_index(axis=0)

    if add_freq:
        data = data.asfreq(freq=freq)

    if fill:
        data = data.fillna(method='ffill')
        
    return data

In [2]:
PERIODS = ['3 Mo','1 Yr','5 Yr','10 Yr','30 Yr']

def create_data_df(use, start_date, end_date):
    data_df = pd.DataFrame()
    for period in PERIODS:
        if use == 'std':
            data = prepare_data(daily_treasury_yield_curve_rate_df, 
                                interest_periods=period,
                                data_source='yield',
                                start_date=start_date,
                                end_date=end_date)
        elif use == 'seasonal_decompose':
            data = prepare_data(daily_treasury_yield_curve_rate_df,
                        interest_periods=period,
                        data_source='yield',
                        start_date=start_date,
                        end_date=end_date,
                        fill=True, add_freq=True, freq='3m')
        elif use == 'adfuller' or use == 'dataset':
            data = prepare_data(daily_treasury_yield_curve_rate_df,
                        interest_periods=period,
                        data_source='yield',
                        start_date=start_date,
                        end_date=end_date,
                        fill=True)
        data_df = add_data_to_df(data_df, period, data)
    if use == 'std':
        data = prepare_data(baa10y_df, 
                            interest_periods='BAA10Y', 
                            data_source='baa10y',
                            start_date=start_date, 
                            end_date=end_date)
    elif use == 'seasonal_decompose':
        data = prepare_data(baa10y_df,
                        interest_periods='BAA10Y',
                        data_source='baa10y',
                        start_date=start_date,
                        end_date=end_date,
                        fill=True, add_freq=True, freq='3m')
    elif use == 'adfuller' or use == 'dataset':
        data = prepare_data(baa10y_df,
                    interest_periods='BAA10Y',
                    data_source='baa10y',
                    start_date=start_date,
                    end_date=end_date,
                    fill=True)
    data_df = add_data_to_df(data_df, 'BAA10Y', data)
    
    return data_df

In [3]:
def add_data_to_df(df, header, data):
    df_to_add = pd.DataFrame(data={header: data})
    df = pd.concat([df, df_to_add], axis=1)
    return df

def add_column_to_df(df,header, data):
    cols = df.shape[1]
    df.insert(cols, header, data)
    return df

In [4]:
def old_plot_data(data, xlabel='', ylabel='', title=''):
    plt.figure(figsize=FIGSIZE)
    ax = plt.axes()
    ax.tick_params(axis='both', labelsize=FONT_SIZE)
    plt.xlabel(xlabel, fontdict={'size':FONT_SIZE})
    plt.ylabel(ylabel)
    plt.title(title)
    data.plot()
    
def plot_data(df, title):
    shape = df.shape[1]
    if shape%2==0:
        nrows=int(shape/2)
        ncols=2
    elif shape%3==0:
        nrows=int(shape/3)
        ncols=3
    else:
        nrows=shape
        ncols=1
    fig, axes = plt.subplots(nrows=nrows, ncols=ncols, dpi=120, figsize=(10,6))
    for i, ax in enumerate(axes.flatten()):
        data = df[df.columns[i]]
        ax.plot(data, linewidth=1)
    # Decorations
        ax.set_title(title + ' of ' + df.columns[i])
        ax.xaxis.set_ticks_position('none')
        ax.yaxis.set_ticks_position('none')
        ax.spines["top"].set_alpha(0)
        ax.tick_params(labelsize=6)
    plt.tight_layout();

In [5]:
def decompose(start_date, end_date):
    trend_df = pd.DataFrame()
    seasonal_df = pd.DataFrame()
    data_df = create_data_df(use='seasonal_decompose', start_date=start_date, end_date=end_date) 

    for name, series in data_df.iteritems():
        result = seasonal_decompose(series, model='additive')
        trend_df = add_data_to_df(trend_df, name, result.trend)
        seasonal_df = add_data_to_df(seasonal_df, name, result.seasonal)

    plot_data(trend_df, 'Trend')
    plot_data(seasonal_df, 'Seasonality')

In [6]:
def test_stationarity(df):
    df_stationary = True
    for name, series in df.iteritems():
        result = adfuller(series.values)
        p_value = result[1]

        print('p-value for period {period} is: {p_value}'.format(period=name, p_value=p_value))
        if p_value <= 0.05:
            print('Time series of no risk interest for {period} is stationary with no unit root\n'.
                  format(period=name))
        else:
            print('Time series of no risk interest for {period} is not stationary and there is a unit root\n'.
                  format(period=name))
            df_stationary = False
    
    if df_stationary:
        print('All time series in df are stationary')
    else:
        print('Not all time series in df are stationary')

In [7]:
def transfrom_from_stationary(stationary_df, train_sample):
    df = pd.DataFrame()
    for col in stationary_df.columns:
        df[col] = train_sample[col] + stationary_df[col].cumsum()
    return df

In [8]:
def plot_prediction(forecast_df, test_df, train_sample,chosen_columns=None):
    forecast_df = transfrom_from_stationary(forecast_df, train_sample)
    if chosen_columns is None:
        chosen_columns = forecast_df.columns
    for col in chosen_columns:
        _df = pd.concat([forecast_df[col], test_df[col]], axis=1)
        _df.columns = ['prediction {}'.format(col), 'actual {}'.format(col)]
        _df.plot()

In [9]:
from sklearn.metrics import mean_squared_error

def evaluate(forecast_df, test_df, chosen_columns=None, train_sample=None):
    forecast_df = transfrom_from_stationary(forecast_df, train_sample)
    if chosen_columns is None:
        chosen_columns = forecast_df.columns
    for col in chosen_columns:
        print('{col} mse: {mse}: '.format(col=col, mse=mean_squared_error(forecast_df[col], test_df[col])))