In [None]:
#default_exp core

# rfpfolio

> Create portfolios with rebalancing, and measure performance.

In [None]:
#hide

# Do this to see possible %nbdev_ magics
from nbdev import *

In [None]:
#hide
from nbdev.showdoc import *
from fastcore.test import *

## Load Price Data

In [None]:
%nbdev_export
import pandas as pd
import numpy as np
import array
import os.path

In [None]:
%nbdev_export
class PriceSource:
    
    def __init__(self, data_root):
        """
        Args:
            data_root (str): Relative or absolute path to the root directory to load data from.
            In the root should be one or more directories named 'weekly', 'monthly', etc.
        """
        self.data_root = os.path.abspath(data_root)
        
    def __repr__(self):
        return (self.__class__.__qualname__ + f"(data_root={self.data_root})")
    
    def loadAdjustedPrices(self, ticker, subdir='weekly'):
        """
        Load data in to single column dataframe, indexed by date.
        The column has the name of the ticker, and is the adjusted price.
        Note: for Yahoo weekly data, the adj price is the adjusted closing price for the 
        week beginning on the specified date.
        
        Args:
            ticker (str): Name of file to load, without '.csv' extension.
            subdir (str): Subdir of data_root wherein to find file to load.
            
        Returns:
            A dataframe with the specified data, indexed by date.
        """
        csv_path =  os.path.join(self.data_root, subdir, f"{ticker}.csv")
        prices = pd.read_csv(csv_path, parse_dates=[0])
        prices = prices[['Date', 'Adj Close']]
        prices.columns = ['Date', ticker]
        return prices.set_index('Date')
    
    def loadAllAdjustedPrices(self, tik_list, subdir='weekly'):
        """
        Load adjusted price data for all tickers to a dataframe, indexed by date.
        Each column name is a ticker, and the column values are the sequence of adjusted prices.
        
        Returns:
            A dataframe with the specified data, indexed by date.
        """
        tik_dict = {tik:  self.loadAdjustedPrices(tik, subdir) for tik in tik_list}
        return pd.concat([tik_dict[tik] for tik in tik_list], axis=1, join='inner')

Here is an example of loading some weekly data:

In [None]:
tst_src = PriceSource('testdata/2017-Apr')

In [None]:
price_data = tst_src.loadAllAdjustedPrices(['SPY', 'IEI', 'GLD'])
price_data.head()

Unnamed: 0_level_0,SPY,IEI,GLD
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2017-04-03,220.896225,115.845558,119.459999
2017-04-10,218.369843,116.867851,122.599998
2017-04-17,220.323349,116.943314,122.309998
2017-04-24,223.60112,116.688652,120.769997
2017-05-01,225.122589,116.301956,117.010002


In [None]:
%nbdev_hide
# top 5 rows of data
spy_5 = price_data.loc[:'2017-05-01', 'SPY']
test_close(list(spy_5), [220.896225, 218.369843, 220.323349, 223.60112, 225.122589])

iei_5 = price_data.loc[:'2017-05-01', 'IEI']
test_close(list(iei_5), [115.845558, 116.867851, 116.943314, 116.688652, 116.301956])

gld_5 = price_data.loc[:'2017-05-01', 'GLD']
test_close(list(gld_5), [119.459999, 122.599998, 122.309998, 120.769997, 117.010002])

# last row of data
last = price_data.iloc[-1:,:]
# 2020-06-29 data for spy, iei, gld
last.to_numpy()[0] = np.array([310.519989, 133.316208, 166.619995])

## Compute Returns

In [None]:
%nbdev_export
def period_returns_from_prices(tik_prices, wr=False):
    """
    Given one or more sequences of prices, compute the returns over each adjacent pair of rows.
    Returns a dataframe containing period returns with row count one less than `tik_prices`.
    
    Arguments:
        tik_prices: DataFrame of ordered prices: ticker column names, rows indexed by date
        wr: if true, returns are represented as wealth ratios (1.01 <=> 1% return), else as 
        fractional returns (0.01 <=> 1% return)
    """
    prices = tik_prices.to_numpy()
    # 1st row of ratios is 2nd row of prices divided by first, & so forth
    ratios = prices[1:] / prices[:-1]
    returns = ratios - 1
    return pd.DataFrame(ratios if wr else returns, index=tik_prices.index[1:], columns=tik_prices.columns)

Create some example data to demonstrate usage of `period_returns_from_prices()`:

In [None]:
# An easy way to get a datetime index
spy_wk = tst_src.loadAdjustedPrices('SPY')

# These values are ajdusted prices
xmpl_data = pd.DataFrame({'A': [1.0, 1.01, 1.0],
                          'B': [1.0, 1.02, 1.1]},
                         index = spy_wk.index[4:7])
xmpl_data

Unnamed: 0_level_0,A,B
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2017-05-01,1.0,1.0
2017-05-08,1.01,1.02
2017-05-15,1.0,1.1


Compute period returns.

In [None]:
per_rets = period_returns_from_prices(xmpl_data)
per_rets

Unnamed: 0_level_0,A,B
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2017-05-08,0.01,0.02
2017-05-15,-0.009901,0.078431


Compute period returns as wealth ratios.

In [None]:
per_rets_wr = period_returns_from_prices(xmpl_data, wr=True)
per_rets_wr

Unnamed: 0_level_0,A,B
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2017-05-08,1.01,1.02
2017-05-15,0.990099,1.078431


In [None]:
%nbdev_hide

# Automated Tests

# period returns
res1 = period_returns_from_prices(xmpl_data)
test_eq(res1.iloc[0, 0], 1.01 / 1.0 - 1)
test_eq(res1.iloc[1, 0], 1.00 / 1.01 - 1)
test_eq(res1.iloc[0, 1], 1.02 / 1.0 - 1)
test_eq(res1.iloc[1, 1], 1.1 / 1.02 - 1)

# wealth ratios
res1 = period_returns_from_prices(xmpl_data, wr=True)
test_eq(res1.iloc[0, 0], 1.01 / 1.0)
test_eq(res1.iloc[1, 0], 1.00 / 1.01)
test_eq(res1.iloc[0, 1], 1.02 / 1.0)
test_eq(res1.iloc[1, 1], 1.1 / 1.02)

**Internal use: for portfolio returns:**

In [None]:
%nbdev_export_internal

def cum_wr_to_period_returns(cum_wr_ar, use_log = False):
    """
    Convert 1D array of cumulative wealth ratios to period fractional returns (0.01 = 1% return)
    The first element of `cum_wr_ar` is the wealth ratio for the corresponding period. (e.g., the first
    element will be 1.01 if the corresponding period return is 1%.)
    
    Cumulative wealth ratios are used in the computation of rebalanced portfolio returns; use this 
    to convert from portfolio cum wr to portfolio period returns.
    
    Arguments:
        cum_wr_ar: numpy array - dim 0 is time
        
    Return: 
        numpy array of period returns
    """
    # add initial 1 so the ratio of `cum_wr_ar` offset by one with itself will be period returns
    row1 = np.array([1.0])
    ar_plus = np.concatenate([row1, cum_wr_ar], 0)
    
    if use_log:
        return np.exp(np.log(ar_plus[1:]) - np.log(ar_plus[:-1])) - 1
    else:
        return ar_plus[1:] / ar_plus[:-1] - 1

    Example data to show usage of `cum_wr_to_period_returns()`

In [None]:
per_rets_wr

Unnamed: 0_level_0,A,B
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2017-05-08,1.01,1.02
2017-05-15,0.990099,1.078431


In [None]:
# compute cumulative returns for input to cum_wr_to_period_returns
cum_rets = np.cumprod(per_rets_wr, axis=0)
cum_rets

Unnamed: 0_level_0,A,B
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2017-05-08,1.01,1.02
2017-05-15,1.0,1.1


In [None]:
per_rets_A = cum_wr_to_period_returns(cum_rets['A'])
per_rets_A

array([ 0.01      , -0.00990099])

In [None]:
%nbdev_hide
# verify result of cum_wr_to_period_returns is equal to original period returns
test_eq(cum_wr_to_period_returns(cum_rets['A']), per_rets['A'])
test_eq(cum_wr_to_period_returns(cum_rets['B']), per_rets['B'])

## Portfolio Returns

In [None]:
%nbdev_export_internal
def pf_cum_wr_seq(price_df, weights, rebal_period, normalize_wts=False):
    """
    Compute the sequence of cumulative portfolio wealth ratios from a sequence of asset prices, with 
    rebalancing at a fixed interval. 
    
    Args:
        price_df: Dataframe of asset returns as (noncumulative) wealth ratios, indexed by date
        weights [list]: portfolio weights for each asset ordered the same as columns of `price_df`
        rebal_period: length of the rebalance period, expressed in number of rows of `price_df`
        
    Returns:
        A python (double) array containing the sequence of portfolio values (cumulative wealth ratios)
    """
    # weights, as a 1-row 2D matrix
    weight_ar = np.array([weights])
    
    if normalize_wts:
        weight_ar = weight_ar / weight_ar.sum()
    
    # cols: wr's for a ticker.  rows: dates
    returns = price_df.to_numpy()
    
    period_start_ind = 0
    pf_val = 1
    
    # list of pf values:
    pf_values = array.array('d')
    
    while (period_start_ind < returns.shape[0]):
        # first row of array for cumprod is the initial capital per asset
        row_1 = weight_ar * pf_val
        
        # get the data for the next rebalance period
        rebal_period_return_data = returns[period_start_ind:period_start_ind+rebal_period]
        
        # Result of this operation: first row is initial capital per asset after rebalancing; 
        # each subsequent row is the capital at subsequent dates
        ar_for_cumprod = np.append(row_1, rebal_period_return_data, axis=0)
        asset_values = np.cumprod(ar_for_cumprod, axis=0)
        
        # Compute sequence of portfolio values: sum of columns (asset vals) for each row.
        # Drop first value as it was the sum of the intitializing values (asset values).
        pf_rebal_period_values = np.sum(asset_values, axis=1)[1:]
        
        # append portfolio values for the rebalancing period to `pf_values`
        pf_values.extend(pf_rebal_period_values)
        
        # portfolio value at the end of the rebal period; use to compute starting capital
        # for each asset at the start of the next rebalancing period.
        pf_val = pf_rebal_period_values[-1]
        
        period_start_ind += rebal_period
        
    return pf_values

In [None]:
%nbdev_export_internal
def pf_period_returns(price_df, weights, rebal_period, pf_name, pf_start_val = 1, normalize_wts=False):
    """
    Compute the sequence of portfolio period returns from a sequence of asset prices, with rebalancing 
    at a fixed interval.
    
    Arguments:
        price_df: Dataframe of asset returns as (noncumulative) wealth ratios, indexed by date
        weights: portfolio weights for each asset ordered the same as columns of `price_df`
        pf_name: name for this portfolio -- will appear as column name
        rebal_period: length of the rebalance period, expressed in number of rows of `price_df`
        
    Returns:
        A DataFrame containing the sequence of portfolio period returns
    """
    # Compute portfolio cumulative wealth ratios (with rebalancing).
    pf_val_ar = np.asarray(pf_cum_wr_seq(price_df, weights, rebal_period, normalize_wts))
    
    # Cumulative wealth ratios to period returns
    pf_ret_ar = cum_wr_to_period_returns(pf_val_ar)
    
    return pd.DataFrame(pf_ret_ar, index=price_df.index, columns=[pf_name])

**Following is the usual entry point.**

In [None]:
%nbdev_export
def computePortfolioReturns(p_src: PriceSource, asset_weights, pf_name, rebal_period, 
                            period='weekly',start_date='2017-05-01', normalize_wts=False):
    """
    Given tickers and their weights, compute the sequence of portfolio returns with rebalancing at a fixed interval.
    
    Arguments:
        p_src
        asset_weights: dictionary from ticker to weight for this asset in pf (weights will)
        pf_name: name for this portfolio -- will appear as column name
        rebal_period: length of the rebalance period, expressed in number of rows of `price_df`
        period: 'weekly', or 'monthly'
        start_date: date of first period in sequence; first rebalance period begins with this period
        normalize_wts: if true, normalize weights to sum to 1
        
    Returns:
        A DataFrame containing the sequence of portfolio returns (value of 0.01 => 1% return), indexed by date.
    """
    adjPrices = p_src.loadAllAdjustedPrices(list(asset_weights.keys()), subdir=period)
    asset_returns = period_returns_from_prices(adjPrices, wr=True).loc[start_date:]
    
    return pf_period_returns(asset_returns, list(asset_weights.values()), rebal_period, pf_name, normalize_wts=normalize_wts).loc[start_date:]

### Tests

**Set up data**

In [None]:
prices_6040 = tst_src.loadAllAdjustedPrices(['TLT', 'SPY'])
wr_6040 = period_returns_from_prices(prices_6040, wr=True)

In [None]:
wr_6040.head()

Unnamed: 0_level_0,TLT,SPY
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2017-04-10,1.02503,0.988563
2017-04-17,1.000567,1.008946
2017-04-24,0.990368,1.014877
2017-05-01,0.991336,1.006804
2017-05-08,1.002915,0.996996


In [None]:
# assume a rebalance period of 3
# For interest, choose a short period of high volatility
test_6040 = wr_6040.loc['2020-02-17':'2020-03-30']
test_6040

Unnamed: 0_level_0,TLT,SPY
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-02-17,1.024144,0.987796
2020-02-24,1.049108,0.888389
2020-03-02,1.073788,1.00405
2020-03-09,0.924431,0.905399
2020-03-16,1.035663,0.849547
2020-03-23,1.051872,1.114118
2020-03-30,1.004771,0.979362


In [None]:
# Frozen display of value used in test
test_6040

Unnamed: 0_level_0,TLT,SPY
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-02-17,1.024144,0.987796
2020-02-24,1.049108,0.888389
2020-03-02,1.073788,1.00405
2020-03-09,0.924431,0.905399
2020-03-16,1.035663,0.849547
2020-03-23,1.051872,1.114118
2020-03-30,1.004771,0.979362


**Test pf_cum_wr_seq()**

In [None]:
expected_py_array = array.array('d', [1.00233529086481, 0.9563036816433197, 0.9901485047549465, 0.9040172073166807, 0.8361477995951826, 0.9079644552327022, 0.8984541223283018])
assert pf_cum_wr_seq(test_6040, [0.4, 0.6], 3) == expected_py_array

In [None]:
# test normalization of scaled weight list
assert pf_cum_wr_seq(test_6040, [0.8, 1.2], 3, normalize_wts=True) == expected_py_array

**Test pf_period_returns()**

In [None]:
import empyrical.stats as estats

  from pandas.util.testing import assert_frame_equal


In [None]:
# note fractional returns, not wealth ratios
pf_return_df = pf_period_returns(test_6040, [0.4, 0.6], 3, 'My Portfolio')
assert list(estats.cum_returns(pf_return_df['My Portfolio'], starting_value=1)) == list(expected_py_array)
# pf_return_df

In [None]:
# Validate we get the same results if we start computing the returns much earlier -- as long
# as the rebalance periods are aligned.
# skip the first two elements of wr_6040 -- rebalance times must align to get the same result
pf_period_returns(wr_6040.iloc[2:, :], [0.4, 0.6], 3, 'My Portfolio').loc['2020-02-17':].head(7)

Unnamed: 0_level_0,My Portfolio
Date,Unnamed: 1_level_1
2020-02-17,0.002335
2020-02-24,-0.045924
2020-03-02,0.035391
2020-03-09,-0.086988
2020-03-16,-0.075075
2020-03-23,0.08589
2020-03-30,-0.010474


**Tests for computePortfolioReturns()**

In [None]:
# computePortfolioValues(asset_weights, pf_name, rebal_period, period='weekly',start_date='2017-05-01', normalize_wts=False):
cpv_df = computePortfolioReturns(tst_src, {'SPY':0.60, 'TLT': 0.40}, "MyPf", 3, start_date='2020-02-17')
assert (cpv_df.iloc[:7,:].values == pf_return_df.values).all()
cpv_df.head(7)

Unnamed: 0_level_0,MyPf
Date,Unnamed: 1_level_1
2020-02-17,0.002335
2020-02-24,-0.045924
2020-03-02,0.035391
2020-03-09,-0.086988
2020-03-16,-0.075075
2020-03-23,0.08589
2020-03-30,-0.010474
