# Package Requirements

In [1]:
!pip3 install multipledispatch
!pip3 install python-dateutil
!pip3 install yahoo_fin



# Date Utils

In [2]:
import pandas as pd
import numpy as np
import unittest

import datetime
from dateutil.relativedelta import relativedelta

class DateConv:
    @staticmethod
    def numpy_to_py(dt : np.datetime64):
        return pd.Timestamp(dt).to_pydatetime().date()
    
    @staticmethod
    def py_to_numpy(dt : datetime.date):
        return pd.Timestamp(dt).to_datetime64().astype('datetime64[D]')

class DateRange:
    def __init__(self, from_date: datetime.date, to_date: datetime.date, freq='A'):
        self.from_date = from_date
        self.to_date = to_date
        self.freq_delta_map = {
            'A': relativedelta(years=1), 
            'M': relativedelta(months=1),
            'D': relativedelta(days=1),
        }
        self.dts = []
        dt = from_date
        delta = self.freq_delta_map[freq]
        while dt <= to_date:
            self.dts.append(dt)
            dt = dt + delta
    
    def __len__(self):
        return len(self.dts)
    
    def __getitem__(self, index):
        return self.dts[index]
    
    def to_pandas(self, dateconv_fn=None):
        df = pd.DataFrame({'date': pd.Series(self.dts)})
        if dateconv_fn:
            df['date'] = df.apply(lambda r: dateconv_fn(r['date']), axis=1)
        return df

    def to_numpy(self, dateconv_fn=None):
        dates = np.array(self.dts)
        if dateconv_fn:
            dates = np.vectorize(dateconv_fn)(dates)
        return dates

class TestDateRange(unittest.TestCase):
    def test_yearly(self):
        fdt = datetime.date.fromisoformat("2022-08-09")
        tdt = datetime.date.fromisoformat("2023-09-09")
        dr = DateRange(fdt, tdt)
        self.assertEqual(2, len(dr))
        self.assertEqual(datetime.date.fromisoformat("2022-08-09"), dr[0])
        self.assertEqual(datetime.date.fromisoformat("2023-08-09"), dr[1])
    
    def test_pandas(self):
        fdt = datetime.date.fromisoformat("2022-08-09")
        tdt = datetime.date.fromisoformat("2023-09-09")
        df = DateRange(fdt, tdt).to_pandas()
        self.assertEqual(2, len(df))
        
    def test_pandas_with_conv(self):
        fdt = datetime.date.fromisoformat("2022-08-09")
        tdt = datetime.date.fromisoformat("2023-09-09")
        df = DateRange(fdt, tdt).to_pandas(DateConv.py_to_numpy)
        self.assertEqual(2, len(df))
    
    def test_numpy_with_conv(self):
        fdt = datetime.date.fromisoformat("2022-08-09")
        tdt = datetime.date.fromisoformat("2023-09-09")
        df = DateRange(fdt, tdt).to_numpy(DateConv.py_to_numpy)
        self.assertEqual(2, len(df))
    
    def test_boundary_yearly(self):
        fdt = datetime.date.fromisoformat("2022-08-09")
        tdt = datetime.date.fromisoformat("2023-08-09")
        dr = DateRange(fdt, tdt)
        self.assertEqual(2, len(dr))
        self.assertEqual(datetime.date.fromisoformat("2022-08-09"), dr[0])
        self.assertEqual(datetime.date.fromisoformat("2023-08-09"), dr[1])
    
    def test_boundary_yearly1(self):
        fdt = datetime.date.fromisoformat("2022-08-09")
        tdt = datetime.date.fromisoformat("2023-08-08")
        dr = DateRange(fdt, tdt)
        self.assertEqual(1, len(dr))
        self.assertEqual(datetime.date.fromisoformat("2022-08-09"), dr[0])

class TestDateConv(unittest.TestCase):
    def test_numpy_to_py(self):
        dt = DateConv.numpy_to_py(datetime.date.fromisoformat("2022-08-09"))
        self.assertEqual(2022, dt.year)
        self.assertEqual(8, dt.month)
        self.assertEqual(9, dt.day)
    
    def test_py_to_numpy(self):
        dt = DateConv.py_to_numpy(datetime.date.fromisoformat("2022-08-09"))
        self.assertEqual(np.datetime64, type(dt))
        self.assertEqual(2022, DateConv.numpy_to_py(dt).year)

res = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(res.result.failures) == 0 

test_numpy_to_py (__main__.TestDateConv.test_numpy_to_py) ... ok
test_py_to_numpy (__main__.TestDateConv.test_py_to_numpy) ... ok
test_boundary_yearly (__main__.TestDateRange.test_boundary_yearly) ... ok
test_boundary_yearly1 (__main__.TestDateRange.test_boundary_yearly1) ... ok
test_numpy_with_conv (__main__.TestDateRange.test_numpy_with_conv) ... ok
test_pandas (__main__.TestDateRange.test_pandas) ... ok
test_pandas_with_conv (__main__.TestDateRange.test_pandas_with_conv) ... ok
test_yearly (__main__.TestDateRange.test_yearly) ... ok

----------------------------------------------------------------------
Ran 8 tests in 0.009s

OK


# Trading Calendar

In [3]:
from yahoo_fin import stock_info
from datetime import date, timedelta

import numpy as np
import unittest

#TODO:
# store the weekends and holidays instead of trading days
# less dataset to store and lookup
# alternative is to pandas.tseries for weekend & holidays 
# and research if it fits the trading calendar 
snp_daily = stock_info.get_data("^GSPC", end_date="08/31/2023", index_as_date = True, interval="1d")
TRADE_DAYS = snp_daily.index.to_numpy().astype('datetime64[D]')
TRADE_DAYS = np.vectorize(DateConv.numpy_to_py)(TRADE_DAYS)

class TradingCalendar:
    def __init__(self, trading_days=TRADE_DAYS):
        self.ds = TRADE_DAYS
        self.day = timedelta(days=1)
        self.min = np.min(TRADE_DAYS)
        self.max = np.max(TRADE_DAYS)
    
    def is_trade_day(self, cur_date: date) -> bool:
        return cur_date in self.ds
    
    def is_in_range(self, cur_date: DateRange) -> bool:
        return cur_date >= self.min and cur_date <= self.max
    
    def next_day(self, cur_date: date, inclusive: bool = True) -> date:
        dt = cur_date if inclusive else cur_date + self.day
        while dt not in self.ds:
            if dt < TCAL.max:
                dt = dt + self.day
            else:
                dt = None
                break
        return dt
    
    def prev_day(self, cur_date: date, inclusive: bool = True) -> date:
        dt = cur_date if inclusive else cur_date - self.day
        while dt not in self.ds:
            if dt > TCAL.min:
                dt = dt - self.day
            else:
                dt = None
                break
        return dt

TCAL = TradingCalendar()


class TestTradingCalendar(unittest.TestCase):
    def test_weekday(self):
        dt = date.fromisoformat('2021-03-10')
        next_dt = date.fromisoformat('2021-03-11')
        prev_dt = date.fromisoformat('2021-03-09')
        self.assertTrue(TCAL.is_trade_day(dt))
        self.assertEqual(next_dt, TCAL.next_day(dt, inclusive=False))
        self.assertEqual(prev_dt, TCAL.prev_day(dt, inclusive=False))
        self.assertEqual(dt, TCAL.next_day(dt))
        self.assertEqual(dt, TCAL.prev_day(dt))
    
    def test_holiday(self):
        dt = date.fromisoformat('2021-01-01')
        next_dt = date.fromisoformat('2021-01-04')
        prev_dt = date.fromisoformat('2020-12-31')
        self.assertFalse(TCAL.is_trade_day(dt))
        self.assertEqual(next_dt, TCAL.next_day(dt))
        self.assertEqual(prev_dt, TCAL.prev_day(dt))
    
    def test_weekend(self):
        dt = date.fromisoformat('2022-02-05')
        next_dt = date.fromisoformat('2022-02-07')
        prev_dt = date.fromisoformat('2022-02-04')
        self.assertFalse(TCAL.is_trade_day(dt))
        self.assertEqual(next_dt, TCAL.next_day(dt))
        self.assertEqual(prev_dt, TCAL.prev_day(dt))
    
    def test_lower_out_of_range(self):
        dt = TCAL.min - TCAL.day
        next_dt = TCAL.min
        self.assertFalse(TCAL.is_trade_day(dt))
        self.assertEqual(next_dt, TCAL.next_day(dt))
        self.assertIsNone(TCAL.prev_day(dt))
    
    def test_higher_out_of_range(self):
        dt = TCAL.max + TCAL.day
        prev_dt = TCAL.max
        self.assertFalse(TCAL.is_trade_day(dt))
        self.assertIsNone(TCAL.next_day(dt))
        self.assertEqual(prev_dt, TCAL.prev_day(dt))

res = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(res.result.failures) == 0

test_numpy_to_py (__main__.TestDateConv.test_numpy_to_py) ... ok
test_py_to_numpy (__main__.TestDateConv.test_py_to_numpy) ... ok
test_boundary_yearly (__main__.TestDateRange.test_boundary_yearly) ... ok
test_boundary_yearly1 (__main__.TestDateRange.test_boundary_yearly1) ... ok
test_numpy_with_conv (__main__.TestDateRange.test_numpy_with_conv) ... ok
test_pandas (__main__.TestDateRange.test_pandas) ... ok
test_pandas_with_conv (__main__.TestDateRange.test_pandas_with_conv) ... ok
test_yearly (__main__.TestDateRange.test_yearly) ... ok
test_higher_out_of_range (__main__.TestTradingCalendar.test_higher_out_of_range) ... ok
test_holiday (__main__.TestTradingCalendar.test_holiday) ... ok
test_lower_out_of_range (__main__.TestTradingCalendar.test_lower_out_of_range) ... ok
test_weekday (__main__.TestTradingCalendar.test_weekday) ... ok
test_weekend (__main__.TestTradingCalendar.test_weekend) ... ok

----------------------------------------------------------------------
Ran 13 tests in 0.01

# Interest Calculator

In [4]:
import numpy as np
import unittest

class GainCalculator:
    def __init__(self, par=1, cap=np.inf, spread=0, floor=-np.inf):
        self.par = par
        self.cap = cap
        self.spread = spread
        self.floor = floor
    
    def __call__(self, growth):
        gpar = growth * self.par
        egpar = gpar -  self.spread
        fegpar = max(egpar, self.floor)
        cr = min(fegpar, self.cap)
        return cr

class TestGainCalculator(unittest.TestCase):
    
    def test_par(self):
        c = GainCalculator(par=1.5)
        self.assertEquals(0.045, c(0.03))
        
    def test_cap(self):
        c = GainCalculator(cap=.1)
        self.assertEquals(0.1, c(0.2))
    
    def test_floor(self):
        c = GainCalculator(floor=0)
        self.assertEquals(0.0, c(-0.2))
        
res = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(res.result.failures) == 0

test_numpy_to_py (__main__.TestDateConv.test_numpy_to_py) ... ok
test_py_to_numpy (__main__.TestDateConv.test_py_to_numpy) ... ok
test_boundary_yearly (__main__.TestDateRange.test_boundary_yearly) ... ok
test_boundary_yearly1 (__main__.TestDateRange.test_boundary_yearly1) ... ok
test_numpy_with_conv (__main__.TestDateRange.test_numpy_with_conv) ... ok
test_pandas (__main__.TestDateRange.test_pandas) ... ok
test_pandas_with_conv (__main__.TestDateRange.test_pandas_with_conv) ... ok
test_yearly (__main__.TestDateRange.test_yearly) ... ok
  self.assertEquals(0.1, c(0.2))
ok
test_floor (__main__.TestGainCalculator.test_floor) ... ok
test_par (__main__.TestGainCalculator.test_par) ... ok
test_higher_out_of_range (__main__.TestTradingCalendar.test_higher_out_of_range) ... ok
test_holiday (__main__.TestTradingCalendar.test_holiday) ... ok
test_lower_out_of_range (__main__.TestTradingCalendar.test_lower_out_of_range) ... ok
test_weekday (__main__.TestTradingCalendar.test_weekday) ... ok
test_w

# Simple Index

In [5]:
#from multipledispatch import dispatch

import numpy as np
import pandas as pd

import datetime
import unittest

class SimpleTradeIndex:
    def check_lbl(self, price_label, price_cols):
        if price_label not in price_cols:
            raise ValueError("{} not in {}", price_label, price_cols)
        
    def __init__(self, index_df, price_label="low", price_cols=["close", "high", "low"], gaincal=GainCalculator(), tcal=TCAL):
        self.index_df = index_df
        self.tcal = tcal
        self.check_lbl(price_label, price_cols)
        self.lbl = price_label
        self.price_cols = price_cols
        self.gaincal = gaincal
        
    def get_idx(self, dt: datetime.date) -> np.float_:
        df = self.index_df
        return df[df.date == dt][self.lbl].values[0]
    
    def check_date(self, dt: datetime.date):
        dt_s = self.index_df["date"]
        if dt not in dt_s.values:
            raise ValueError("{} not a trading day!".format(dt))
    
    #@dispatch(np.float_, np.float_)
    def calc_gain_by_idx(self, then_idx: np.float_, now_idx: np.float_) -> np.float_:
        return self.gaincal((now_idx - then_idx) / then_idx)
    
    #@dispatch(np.datetime64, np.datetime64)
    def calc_gain(self, from_date: datetime.date, to_date: datetime.date) -> np.float_:
        self.check_date(from_date)
        self.check_date(to_date)
        
        then_idx = self.get_idx(from_date)
        now_idx = self.get_idx(to_date)
        return self.calc_gain_by_idx(then_idx, now_idx)
    
    def calc_gains(self, from_dates: pd.Series, to_date: datetime.date) -> (pd.DataFrame, np.float_):
        gains_df = from_dates.reset_index()
        gains_df.columns = ["from_index", "from_date"]
        gains_df["to_date"] = to_date
        gains_df["to_idx"] = self.get_idx(to_date)
        
        gains_df = self.index_df.merge(gains_df, left_on='date', right_on='from_date')
        gains_df["from_idx"] = gains_df[self.lbl]
        drop_cols = [x for x in gains_df.columns.values if x in ["date", *self.price_cols]]
        gains_df = gains_df.drop(drop_cols, axis=1)
        
        gains_df["gain_ratio"] = gains_df.apply(lambda r: self.calc_gain(r['from_date'], r['to_date']), axis=1)
        return gains_df, np.mean(gains_df["gain_ratio"])
    
class TestSimpleTradeIndex(unittest.TestCase):
    def setUp(self):
        idx_df = pd.DataFrame(columns=["date", "close", "low"])
        
        self.lows = [9, 10, 4, 10, 16]
        self.closes = [10, 11, 5, 15, 20]
        
        first_day = datetime.date.fromisoformat('2022-03-07')
        idx_df["date"] = pd.Series([first_day+datetime.timedelta(days=x) for x in range(5)])
        idx_df["close"] = pd.Series(self.closes)
        idx_df["low"] = pd.Series(self.lows)
        
        self.first_day = first_day
        self.one_day = np.timedelta64(1, 'D')
        self.sti_low = SimpleTradeIndex(idx_df)
        self.sti_close = SimpleTradeIndex(idx_df, price_label="close")
    
    def add_days(self, d):
        return self.first_day + datetime.timedelta(days=d)
    
    def test_calc_gain_by_dates_by_low(self):
        from_date = self.add_days(1)
        to_date = self.add_days(3)
        expected_ratio = (self.lows[3] - self.lows[1]) / self.lows[1]
        np.testing.assert_almost_equal(expected_ratio, self.sti_low.calc_gain(from_date, to_date))

    def test_calc_gain_by_dates_by_close(self):
        from_date = self.add_days(1)
        to_date = self.add_days(3)
        expected_ratio = (self.closes[3] - self.closes[1]) / self.closes[1]
        np.testing.assert_almost_equal(expected_ratio, self.sti_close.calc_gain(from_date, to_date))

    def test_calc_gains(self):
        from_dates = pd.Series([self.first_day+datetime.timedelta(days=x) for x in range(3)])
        from_df, mean_gain = self.sti_low.calc_gains(from_dates, self.add_days(3))
        expected_gains = [(self.lows[3] - x) / x for x in self.lows[0:3]]
        expected_mean = np.mean(expected_gains)
        np.testing.assert_almost_equal(expected_mean, mean_gain)
        pd.testing.assert_series_equal(pd.Series(expected_gains), from_df["gain_ratio"], check_names=False)
        
    def test_assertion_from_date(self):
        from_date = self.add_days(10)
        to_date = self.add_days(13)
        self.assertRaises(ValueError, lambda: self.sti_low.calc_gain(from_date, to_date))

    def test_assertion_to_date(self):
        from_date = self.add_days(1)
        to_date = self.add_days(13)
        self.assertRaises(ValueError, lambda: self.sti_low.calc_gain(from_date, to_date))


res = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(res.result.failures) == 0

test_numpy_to_py (__main__.TestDateConv.test_numpy_to_py) ... ok
test_py_to_numpy (__main__.TestDateConv.test_py_to_numpy) ... ok
test_boundary_yearly (__main__.TestDateRange.test_boundary_yearly) ... ok
test_boundary_yearly1 (__main__.TestDateRange.test_boundary_yearly1) ... ok
test_numpy_with_conv (__main__.TestDateRange.test_numpy_with_conv) ... ok
test_pandas (__main__.TestDateRange.test_pandas) ... ok
test_pandas_with_conv (__main__.TestDateRange.test_pandas_with_conv) ... ok
test_yearly (__main__.TestDateRange.test_yearly) ... ok
  self.assertEquals(0.1, c(0.2))
ok
test_floor (__main__.TestGainCalculator.test_floor) ... ok
test_par (__main__.TestGainCalculator.test_par) ... ok
test_assertion_from_date (__main__.TestSimpleTradeIndex.test_assertion_from_date) ... ok
test_assertion_to_date (__main__.TestSimpleTradeIndex.test_assertion_to_date) ... ok
test_calc_gain_by_dates_by_close (__main__.TestSimpleTradeIndex.test_calc_gain_by_dates_by_close) ... ok
test_calc_gain_by_dates_by_lo

# Compounded Pt to Pt Trade Index

In [6]:
import pandas as pd
import numpy as np
import datetime
from dateutil.relativedelta import relativedelta

import unittest

class CompoundedTradeIndex(SimpleTradeIndex):
    def check_frequency(self, freq):
        valid_freq_values = ['A', 'M', 'D']
        if freq not in valid_freq_values:
            raise ValueError("{} is not in {}".format(freq, valid_freq_values))
        return freq
    
    def __init__(self, index_df, frequency="A", gaincal=GainCalculator(par=1), price_label="low", price_cols=["close", "open", "high", "low"], tcal=TCAL):
        super().__init__(index_df, price_label, price_cols, gaincal=gaincal, tcal=tcal)
        self.frequency = self.check_frequency(frequency)
    
    def calc_gain(self, from_date: datetime.date, to_date: datetime.date) -> np.float_:
        dts = DateRange(
            from_date, 
            to_date, 
            'A'
        ).to_numpy()
        
        def calc(from_date, to_date):
            from_idx = self.get_idx(self.tcal.prev_day(from_date))
            to_idx = self.get_idx(self.tcal.prev_day(to_date))
            return self.calc_gain_by_idx(from_idx, to_idx)
        
        fdt = dts[0]
        v = 1
        for tdt in dts[1:]:
            v = v + v * calc(fdt, tdt)
            fdt = tdt
        return v

class TestCompoundedTradeIndex(unittest.TestCase):
    def setUp(self):
        idx_df = pd.DataFrame(columns=["date", "close", "low"])
        
        self.lows = [9, 10, -4, 10, 16]
        self.closes = [10, 11, 5, 15, 20]
        
        first_day = datetime.date.fromisoformat('2000-03-07')
        self.tcal = TradingCalendar(np.array([first_day+datetime.timedelta(days=x) for x in range(356*5)]))
        idx_df["date"] = pd.Series([first_day+relativedelta(years=x) for x in range(5)])
        idx_df["close"] = pd.Series(self.closes)
        idx_df["low"] = pd.Series(self.lows)
        
        self.first_day = first_day
        self.cti = CompoundedTradeIndex(idx_df, "A", tcal=self.tcal)
    
    def cgain(self, idx):
        v = 1
        for i in range(1, len(idx)):
            v = v + v * ((idx[i]-idx[i-1])/idx[i-1])
        return v
    
    def test_gain(self):
        from_date = self.first_day
        to_date = self.first_day+relativedelta(years=3)+datetime.timedelta(days=25)
        expected_gain = self.cgain(self.lows[0:4])
        actual_gain = self.cti.calc_gain(from_date, to_date)
        np.testing.assert_almost_equal(expected_gain, actual_gain)
    
    def test_gains(self):
        from_dates = pd.Series([self.first_day+relativedelta(years=x) for x in range(3)])
        to_date = self.first_day+relativedelta(years=3)
        from_df, mean_gain = self.cti.calc_gains(from_dates, to_date)
        expected_gains = [self.cgain(self.lows[x:4]) for x in range(3)]
        expected_mean = np.mean(expected_gains)
        np.testing.assert_almost_equal(expected_mean, mean_gain)
        pd.testing.assert_series_equal(pd.Series(expected_gains), from_df["gain_ratio"], check_names=False)
        
    
res = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(res.result.failures) == 0

test_gain (__main__.TestCompoundedTradeIndex.test_gain) ... 

ok
test_gains (__main__.TestCompoundedTradeIndex.test_gains) ... ok
test_numpy_to_py (__main__.TestDateConv.test_numpy_to_py) ... ok
test_py_to_numpy (__main__.TestDateConv.test_py_to_numpy) ... ok
test_boundary_yearly (__main__.TestDateRange.test_boundary_yearly) ... ok
test_boundary_yearly1 (__main__.TestDateRange.test_boundary_yearly1) ... ok
test_numpy_with_conv (__main__.TestDateRange.test_numpy_with_conv) ... ok
test_pandas (__main__.TestDateRange.test_pandas) ... ok
test_pandas_with_conv (__main__.TestDateRange.test_pandas_with_conv) ... ok
test_yearly (__main__.TestDateRange.test_yearly) ... ok
  self.assertEquals(0.1, c(0.2))
ok
test_floor (__main__.TestGainCalculator.test_floor) ... ok
test_par (__main__.TestGainCalculator.test_par) ... ok
test_assertion_from_date (__main__.TestSimpleTradeIndex.test_assertion_from_date) ... ok
test_assertion_to_date (__main__.TestSimpleTradeIndex.test_assertion_to_date) ... ok
test_calc_gain_by_dates_by_close (__main__.TestSimpleTradeIndex.te

# Combine multiple indexes

In [7]:
import datetime
import numpy as np

import unittest

class IndexStrategy:
    def compute_ror(self):
        pass
    
class SingleIndexStrategy(IndexStrategy):
    def __init__(self, index: SimpleTradeIndex):
        self.index = index
    
    def compute_ror(self, investments: pd.DataFrame, now_date: datetime.date) -> np.float_:
        from_dates = investments["date"]
        if len(from_dates) == 0:
            return 0.0
        
        gains_df, _ = self.index.calc_gains(from_dates, now_date)
        df = investments.merge(gains_df, left_index=True, right_on="from_index")
        return np.sum(df['amount'] * df['gain_ratio'])
    
    def compute_req(self, target_amount: np.float_, from_date:datetime.date, now_date: datetime.date) -> np.float_:
        gain_ratio = self.index.calc_gain(from_date, now_date)
        return target_amount/(1 + gain_ratio)

class MultiIndexStrategy(IndexStrategy):
    def __init__(self, indexes: list((np.float_, IndexStrategy))):
        self.indexes = indexes
    
    def compute_ror(self, investments: pd.DataFrame, now_date: datetime.date) -> np.float_:
        ror = 0
        for share, strategy in self.indexes:
            ror += share * strategy.compute_ror(investments, now_date)
        return ror

    def compute_req(self, target_amount: np.float_, from_date:datetime.date, now_date: datetime.date) -> np.float_:
        amount_req = 0
        for share, strategy in self.indexes:
            amount_req += strategy.compute_req(share * target_amount, from_date, now_date)
        return amount_req

#TODO: unit tests

# Account Management

In [8]:
import pandas as pd
import numpy as np

class Account:
    def __init__(self, strategy: IndexStrategy):
        self.strategy = strategy
        self.tcal = TCAL
        self.credits = pd.DataFrame(columns=["date", "amount", "adj_amount", "source"])
        self.debits = pd.DataFrame(columns=["date", "amount", "source", "gains"])

    def credit(self, cur_date, amount, source='customer'):
        if self.tcal.is_trade_day(cur_date):
            self.credits.loc[len(self.credits)] = [cur_date, amount, amount, source]
        else:
            raise ValueError("{} is not a trading day".format(cur_date))
        return self
    
    def calc_balance(self, now_date):
        now_work_date = self.tcal.prev_day(now_date)
    
        filtered_credits = self.credits[self.credits["date"] <= now_work_date]
        filtered_debits = self.debits[self.debits["date"] <= now_work_date]
        
        total_credits = np.sum(filtered_credits["amount"])
        total_debits = np.sum(filtered_debits["amount"])
        total_contributions = total_credits - total_debits
        
        gains_from_credits = self.strategy.compute_ror(
            filtered_credits[["date", "amount"]], 
            now_work_date
        )
        
        gains_from_debits = self.strategy.compute_ror(
            filtered_debits[["date", "amount"]], 
            now_work_date
        )
        total_gains = gains_from_credits - gains_from_debits
        cur_balance = total_contributions + total_gains
        return (cur_balance, total_gains, total_credits, total_debits)
    
    def source_debits(self, cur_date, debit_amount):
        filtered_credits = self.credits[self.credits["date"] <= cur_date]
        
        now_date = cur_date
        df = filtered_credits.apply(lambda row: 
            pd.Series([
                row["adj_amount"] + self.strategy.compute_ror(
                    now_date,
                    pd.DataFrame({'date': [ row['date'] ], 'amount': [ row["adj_amount"]]})
                )
            ], axis=1)
        )
        
        #cummulative sum to filter credits to source the debit
        df = df.iloc[:(df[0].cumsum(axis=0) <= debit_amount).idxmin(),:]
        
        #compute new_adj_amount
        df = df.assign(new_adj_amount=0)
        debited_amount = np.sum(df[0])
        if debited_amount < debit_amount:
            last_index = df[0].idxmax()+1
            remaining_amount = (debit_amount - debited_amount)
            
            last_row = filtered_credits.iloc[last_index]
            amount_req = self.strategy.compute_req(remaining_amount, last_row['date'], now_date)
            new_adj_amount = last_row["adj_amount"] - amount_req
            df.loc[last_index] = [remaining_amount, new_adj_amount]
            
        return df
    
    def debit(self, cur_date, amount, source="customer"):
        if not self.tcal.is_trade_day(cur_date):
            raise ValueError("{} not a trading day".format(cur_date))
        
        balance, _, _, _ = self.calc_balance(cur_date)
        if amount > balance:
            raise ValueError("insufficient balance")
        
        #get credits sourcing debits
        cod_df = self.source_debits(cur_date, amount)
        
        gains = []
        for x in cod_df.index:
            withdrawl = cod_df.iloc[x]
            credit = self.credits.iloc[x]
            gains.append((credit["date"], withdrawl[0]-credit["adj_amount"]))
            self.credits.iloc[x]["adj_amount"] = withdrawl["new_adj_amount"]
            
        self.debits.loc[len(self.debits)] = [cur_date, amount, source, gains]
        return self

#TODO: unit tests

# Fund Simulator

In [9]:
from yahoo_fin import stock_info
import pandas as pd
import numpy as np
import datetime

class StockDataLoader:
    def __init__(self):
        pass
    
    def __len__(self) -> np.int_:
        pass
    
    def to_pandas(self) -> pd.DataFrame:
        pass

class YahooStockDataLoader(StockDataLoader):
    def __init__(self, ticker: np.string_ = "^GSPC", end_date: datetime.date = datetime.date.fromisoformat("2023-08-31")):
        super().__init__()
        data_df = stock_info.get_data(ticker, end_date=end_date, index_as_date = True, interval="1d")
        data_df.reset_index(inplace=True)
        data_df = data_df.rename(columns = {'index':'date'})
        data_df['date'] = data_df['date'].apply(lambda x: DateConv.numpy_to_py(x))
        self.data_df = data_df
    
    def __len__(self) -> np.int_:
        return len(self.data_df)
    
    def to_pandas(self) -> pd.DataFrame:
        return self.data_df

class InstallmentGenerator:
    def __init__(self, tcal=TCAL):
        self.tcal = tcal
        self.freq_delta_map = {
            'A': relativedelta(years=1),
            'M': relativedelta(months=1),
            'D': relativedelta(days=1)
        }
    
    def generate(self):
        return []
    
class EqualInstallmentGenerator(InstallmentGenerator):
    def __init__(self, amount, start_date, frequency='M', num=100, tcal=TCAL):
        super().__init__(tcal)
        self.amount = amount
        self.start_date = start_date
        self.frequency = frequency
        self.num = num
    
    def generate(self):
        installments = []
        delta = self.freq_delta_map[self.frequency]
        dt = self.start_date
        ninstallments = 0
        while ninstallments < self.num:
            dt = self.tcal.next_day(dt)
            installments.append((dt, self.amount))
            dt = dt + delta
            ninstallments += 1
        return installments

class FundSimulator:
    def __init__(self, invest_style="simple", deposit_freq='M', tcal=TCAL):
        self.style_map = {
            'simple': lambda d, gc: SimpleTradeIndex(index_df=d, gaincal=gc),
            'compounded-annual': lambda d, gc: CompoundedTradeIndex(index_df=d, frequency='A', gaincal=gc)
        }
        
        self.deposit_freq = deposit_freq
        self.tcal = tcal
        self.allocs = []
        self.installments = []
        self.withdraws = []
        self.strategy = None
        self.style = invest_style
    
    def add_alloc(self, alloc, ticker, gain_calc: GainCalculator):
        self.allocs.append((alloc, ticker, gain_calc))
        return self
    
    def add_deposits(self, generator: InstallmentGenerator):
        self.installments = generator.generate()
        return self
    
    def add_withdraws(self, amount, withdraw_dates):
        self.withdraws = []
        for dt in withdraw_dates:
            self.withdraws.append(self.tcal.next_day(dt), amount)
        return self
    
    def _validate_allocs(self):
        total = np.sum([x[0] for x in self.allocs])
        if total != 1:
            raise ValueError("sum of all allocations is not 1")
        return total
    
    def _build_index(self, ticker, gaincal):
        data_df = YahooStockDataLoader(ticker=ticker).to_pandas()
        return self.style_map[self.style](data_df, gaincal)
    
    def _build_single_strategy(self, ticker, gaincal):
        index = self._build_index(ticker, gaincal)
        return SingleIndexStrategy(index)
    
    def _build_strategy(self):
        self._validate_allocs()
        if len(self.allocs) == 1:
            share, ticker, gaincal = self.allocs[0]
            return self._build_single_strategy(ticker, gaincal)
        
        strategies = []
        for share, ticker, gaincal in self.allocs:
            strategies.append((share, self._build_single_strategy(ticker, gaincal)))
            
        return MultiIndexStrategy(strategies)
    
    def _make_deposits(self):
        for dt, amt in self.installments:
            self.fund_account.credit(dt, amt)
            
    def _make_withdraws(self):
        for dt, amt in self.withdraws:
            self.fund_account.debit(dt, amt)
    
    def _collect_fees(self):
        pass
    
    def build(self):
        strategy = self._build_strategy()
        self.fund_account = Account(strategy=strategy)
        self._make_deposits()
        self._collect_fees()
        self._make_withdraws()
        return self
        
    def summarize_yearly(self, start_date : datetime.date, end_date : datetime.date):
        delta = relativedelta(years=1)
        dt = start_date
        
        summary_df = pd.DataFrame(columns=["date", "balance", "gains", "gains_percent", "deposits", "withdraws"])
        while dt < end_date:
            cur_balance, total_gains, total_credits, total_debits = self.fund_account.calc_balance(dt)
            summary_df.loc[len(summary_df)] = [
                dt,
                round(cur_balance, 2),
                round(total_gains, 2), 
                round(total_gains/(total_credits-total_debits)*100, 2), 
                round(total_credits, 2), 
                round(total_debits, 2)
            ]
            dt = dt + delta
        
        return summary_df
    
    def gen_tax_report(self):
        return pd.DataFrame(columns=["tax_year", "gains", "tax_rate", "tax"])

In [15]:
import datetime
import pandas as pd
import numpy as np

# snp - simple - nofee - simulator
# 21 years monthly 1000 deposit
# 12,13,14,15th years 40k withdrawl
# summary after 21 years

fund_start_date = datetime.date.fromisoformat("2023-08-01") - relativedelta(years=35)

snp_simple_nofee = FundSimulator().add_alloc(
        alloc=1, 
        ticker="^GSPC", 
        gain_calc=GainCalculator(par=1)
    ).add_deposits(
        generator=EqualInstallmentGenerator(
        amount=(655.55-75),
        start_date=fund_start_date,
        frequency='M',
        num=30*12)
    ).build()

summary_df = snp_simple_nofee.summarize_yearly(
    fund_start_date+relativedelta(years=1), 
    fund_start_date+relativedelta(years=40))

summary_df

1988-08-01


Unnamed: 0,date,balance,gains,gains_percent,deposits,withdraws
0,1989-08-01,8319.32,1352.72,19.42,6966.6,0
1,1990-08-01,15702.63,1769.43,12.7,13933.2,0
2,1991-08-01,24968.99,4069.19,19.47,20899.8,0
3,1992-08-01,34036.33,6750.48,24.74,27285.85,0
4,1993-08-01,43209.27,8956.82,26.15,34252.45,0
5,1994-08-01,51229.11,10010.06,24.29,41219.05,0
6,1995-08-01,70198.21,22012.56,45.68,48185.65,0
7,1996-08-01,87251.71,32680.01,59.88,54571.7,0
8,1997-08-01,136720.07,75181.77,122.17,61538.3,0
9,1998-08-01,169907.91,101403.01,148.02,68504.9,0
