In [1]:
from typing import Union, Tuple
import numpy as np
import pandas as pd
import datetime

In [6]:
df = pd.read_parquet('data/msci_world_index.parquet')

In [7]:
class SavingPlan:
    """ 
    This class is used to calculate the total worth of a saving plan 
    that invests a fixed amount of money on a fixed day of the month.

    Parameters
    ----------
    df : pd.DataFrame
        A pandas DataFrame containing the stock data.
    invest_amount : int
        The amount of money to invest on the fixed day of the month.
    day_to_invest : int
        The day of the month to invest the money.
    period : str or Tuple[datetime.datetime, datetime.datetime]
        The period to consider. If a string is passed, the only available value is "max".
        If a tuple is passed, it should contain two datetime.datetime objects.
    
    Attributes
    ----------
    total_worth : float
        The total worth of the saving plan.    
    """
    def __init__(self, 
                 df: pd.DataFrame,
                 invest_amount: int,
                 day_to_invest: int,
                 period: Union[str, Tuple[datetime.datetime, datetime.datetime]]):
        self.df = df
        self.invest_amount = invest_amount
        self.day_to_invest = day_to_invest
        self.period = period

        if isinstance(self.period, str):
            assert self.period == "max", "If using string the only available value is 'max'"
    
        self.df = df

        if self.df.index.tz:
            self.df.index = self.df.index.tz_convert(None)

        if isinstance(self.period, tuple):
            self.df = self.df.loc[self.period[0]: self.period[1]]
            
    
        self.close_column = "Close"

        self.prepared_df = self.prepare_data()

        self._total_worth = None

    @property
    def total_worth(self):
        if self._total_worth is None:
            self._total_worth = self.prepared_df.tail(1)["total_worth"].iloc[0].item()
        return self._total_worth

    def select_required_columns(self, df: pd.DataFrame) -> pd.DataFrame:
        columns_to_drop = [x for x in df.columns if x != self.close_column]
        return df.drop(columns_to_drop, axis=1)
    
    def rename_close(self, df) -> pd.DataFrame:
        return df.rename({self.close_column: "price"}, axis=1)

    def sort_date(self, df: pd.DataFrame) -> pd.DataFrame:
        return df.sort_index()

    def add_day_of_month(self, df: pd.DataFrame) -> pd.DataFrame:
        """Add the day of the month to the DataFrame."""
        df["day_of_month"]=  df.index.day
        return df
    
    def _find_next_trading_day(self, x: pd.Series, day: int) -> int:
        """Find the next trading day in the Series x after the day of the month day."""
        # Perform a binary search to find the index where the target day should be inserted
        idx = np.searchsorted(x, day, side='left')
        
        # If the index is within bounds, return the next day, otherwise wrap around
        if idx < len(x):
            return x.iloc[idx]
        else:
            return x.iloc[-1] 

    def add_real_day_to_invest(self, df: pd.DataFrame) -> pd.DataFrame:
        """Add the real day to invest to the DataFrame."""
        df["day_to_invest"] = df.groupby(pd.Grouper(freq="1ME"))["day_of_month"].transform(self._find_next_trading_day, day=self.day_to_invest)

        return df
    
    def add_saving(self, df:pd.DataFrame) -> pd.DataFrame:
        """Add the saving executed on the day to invest."""
        df.loc[df.day_of_month == df.day_to_invest, "invest_amount"] = self.invest_amount
        return df

    def add_number_of_stocks_bought(self, df: pd.DataFrame) -> pd.DataFrame:
        df["number_of_stocks_bought"] = df["invest_amount"] / df["price"]
        return df
    
    def add_number_of_stocks(self, df: pd.DataFrame) -> pd.DataFrame:
        df["number_of_stocks"] = df["number_of_stocks_bought"].cumsum()
        return df
    
    def add_total_worth(self, df: pd.DataFrame) -> pd.DataFrame:
        df["total_worth"] = df["number_of_stocks"] * df["price"]
        return df
    
    def prepare_data(self):
        result = (
            self.df
            .pipe(self.select_required_columns)
            .pipe(self.rename_close)
            .pipe(self.add_day_of_month)
            .pipe(self.sort_date)
            .pipe(self.add_real_day_to_invest)
            .pipe(self.add_saving)
            .pipe(self.add_number_of_stocks_bought)
            .pipe(self.add_number_of_stocks)
            .pipe(self.add_total_worth)
            .dropna()
            )
        
        return result

    


# Initial performane

In [8]:
%%timeit

saving_plan = SavingPlan(df, 100, 1, "max")
saving_plan.total_worth

46.3 ms ± 298 μs per loop (mean ± std. dev. of 7 runs, 10 loops each)


# Using numpy

In [10]:
class SavingPlan:
    def __init__(self, 
                 df: pd.DataFrame,
                 invest_amount: int,
                 day_to_invest: int,
                 period: Union[str, Tuple[datetime.datetime, datetime.datetime]]):
        self.df = df
        self.invest_amount = invest_amount
        self.day_to_invest = day_to_invest
        self.period = period

        if isinstance(self.period, str):
            assert self.period == "max", "If using string the only available value is 'max'"
    
        self.df = df

        if self.df.index.tz:
            self.df.index = self.df.index.tz_convert(None)

        if isinstance(self.period, tuple):
            self.df = self.df.loc[self.period[0]: self.period[1]]
            
    
        self.close_column = "Close"

        self.prepared_df = self.prepare_data()

        self._total_worth = None

    @property
    def total_worth(self):
        if self._total_worth is None:
            self._total_worth = self.prepared_df.tail(1)["total_worth"].iloc[0].item()
        return self._total_worth

    def select_required_columns(self, df: pd.DataFrame) -> pd.DataFrame:
        columns_to_drop = [x for x in df.columns if x != self.close_column]
        return df.drop(columns_to_drop, axis=1)
    
    def rename_close(self, df) -> pd.DataFrame:
        return df.rename({self.close_column: "price"}, axis=1)

    def sort_date(self, df: pd.DataFrame) -> pd.DataFrame:
        return df.sort_index()

    def add_day_of_month(self, df: pd.DataFrame) -> pd.DataFrame:
        """Add the day of the month to the DataFrame."""
        df["day_of_month"]=  df.index.day
        return df
    
    @jit
    def _find_next_trading_day(self, x: pd.Series, day: int) -> int:
        """Find the next trading day in the Series x after the day of the month day."""
        # Perform a binary search to find the index where the target day should be inserted
        idx = np.searchsorted(x, day, side='left')
        
        # If the index is within bounds, return the next day, otherwise wrap around
        if idx < len(x):
            return x.iloc[idx]
        else:
            return x.iloc[-1] 

    def add_real_day_to_invest(self, df: pd.DataFrame) -> pd.DataFrame:
        """Add the real day to invest to the DataFrame."""
        df["day_to_invest"] = df.groupby(pd.Grouper(freq="1ME"))["day_of_month"].transform(self._find_next_trading_day, day=self.day_to_invest)

        return df
    
    def add_saving(self, df:pd.DataFrame) -> pd.DataFrame:
        """Add the saving executed on the day to invest."""
        df.loc[df.day_of_month == df.day_to_invest, "invest_amount"] = self.invest_amount
        return df

    def add_number_of_stocks_bought(self, df: pd.DataFrame) -> pd.DataFrame:
        df["number_of_stocks_bought"] = df["invest_amount"] / df["price"]
        return df
    
    def add_number_of_stocks(self, df: pd.DataFrame) -> pd.DataFrame:
        df["number_of_stocks"] = df["number_of_stocks_bought"].cumsum()
        return df
    
    def add_total_worth(self, df: pd.DataFrame) -> pd.DataFrame:
        df["total_worth"] = df["number_of_stocks"] * df["price"]
        return df
    
    def prepare_data(self):
        result = (
            self.df
            .pipe(self.select_required_columns)
            .pipe(self.rename_close)
            .pipe(self.add_day_of_month)
            .pipe(self.sort_date)
            .pipe(self.add_real_day_to_invest)
            .pipe(self.add_saving)
            .pipe(self.add_number_of_stocks_bought)
            .pipe(self.add_number_of_stocks)
            .pipe(self.add_total_worth)
            .dropna()
            )
        
        return result

    


In [11]:
%%timeit

saving_plan = SavingPlan(df, 100, 1, "max")
saving_plan.total_worth

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
non-precise type pyobject
During: typing of argument at /var/folders/n0/_ynk_mts4cg0tzzzy408rqyh0000gn/T/ipykernel_40741/3657267690.py (72)

File "../../../../var/folders/n0/_ynk_mts4cg0tzzzy408rqyh0000gn/T/ipykernel_40741/3657267690.py", line 72:
<source missing, REPL/exec in use?> 

This error may have been caused by the following argument(s):
- argument 0: Cannot determine Numba type of <class '__main__.SavingPlan'>
- argument 1: Cannot determine Numba type of <class 'pandas.core.series.Series'>


In [282]:
prices = df["Close"].to_numpy()
dates = df.index.date

# Extract the months
months = np.array([date.month for date in dates])
years = np.array([date.year for date in dates])
day_of_month = np.array([date.day for date in dates])

#  Find the indices where the month and year change
change_indices = np.where((months[:-1] != months[1:]) | (years[:-1] != years[1:]))[0] + 1

# Split the dates array at the change indices

split_dates = np.split(dates, change_indices)

def find_next_trading_day(x: np.ndarray, days: np.ndarray) -> int:
    # Perform a binary search to find the index where the target day should be inserted
    idx = np.searchsorted(x, days, side='left')
    return idx
    

def encode_date(date: datetime.date) -> int:
    
    return date.year * 10000 + date.month * 100 + date.day

def decode_date(x: int) -> datetime.date:
    year = x // 10000
    month = (x - year * 10000) // 100
    day = x - year * 10000 - month * 100
    return datetime.date(year, month, day)

def encode_dates_to_ints(x: np.ndarray) -> np.ndarray:
    """I want to convert the dates to intigers, which can help find
    the next invest day
    """
    
    return np.array([encode_date(date) for date in x])

def decode_dates_from_ints(x: np.ndarray) -> np.ndarray:

    return np.array([decode_date(date) for date in x])

# get all years and months from min dates to max dates  
def generate_day_ints(dates: np.ndarray, day: np.ndarray) -> np.ndarray:
    """Generate the day ints from the dates."""
    years = np.unique([date.year for date in dates])
    months = np.unique([date.month for date in dates])

    for year in years:
        for month in months:
            if (year >= 2024) and (month >= 10) and (day < 4):
                break
            yield year * 10000 + month * 100 + day

indices = find_next_trading_day(encode_dates_to_ints(dates),list(generate_day_ints(dates, day)) )

invest_amount = 10
bought_stocks = invest_amount / prices[indices]

all_stocks = np.cumsum(bought_stocks)

total_worth = all_stocks * prices[indices]


In [284]:
len(prices[indices])

633

In [271]:
total_worth[-1]

np.float64(65363.184988472385)

# Summerize results

In [300]:
class SavingPlan:
    def __init__(self, 
                 df: pd.DataFrame,
                 invest_amount: int,
                 day_to_invest: int,
                 period: Union[str, Tuple[datetime.datetime, datetime.datetime]]):
        self.df = df
        self.invest_amount = invest_amount
        self.day_to_invest = day_to_invest
        self.period = period

        if isinstance(self.period, str):
            assert self.period == "max", "If using string the only available value is 'max'"

        self.df = df

        if isinstance(self.period, tuple):
            self.df = self.df.loc[self.period[0]: self.period[1]]

        self.prices = self.df["Close"].to_numpy()
        self.dates = self.df.index.date

        self._indices = None
        self._bought_stocks = None
        self._all_stocks = None
        self._total_worth = None

    @property
    def indices(self):
        if self._indices is None:
            self._indices = self.get_indices()
        
        return self._indices
    
    @property
    def invest_days(self):
        return self.dates[self.indices]
    
    @property
    def bought_stocks(self):
        if self._bought_stocks is None:
            self._bought_stocks = self.invest_amount / self.prices[self.indices]
        
        return self._bought_stocks
    
    @property
    def all_stocks(self):
        if self._all_stocks is None:
            self._all_stocks = np.cumsum(self.bought_stocks)
        
        return self._all_stocks
    
    @property
    def total_worth(self):
        if self._total_worth is None:
            self._total_worth = self.all_stocks * self.prices[self.indices][-1]
        
        return self._total_worth[-1]
    
    def find_next_trading_day(self, x: np.ndarray, days: np.ndarray) -> int:
        # Perform a binary search to find the index where the target day should be inserted
        idx = np.searchsorted(x, days, side='left')
        return idx
    
    def encode_date(self, date: datetime.date) -> int:
        return date.year * 10000 + date.month * 100 + date.day
    
    def decode_date(self, x: int) -> datetime.date:
        year = x // 10000
        month = (x - year * 10000) // 100
        day = x - year * 10000 - month * 100
        return datetime.date(year, month, day)
    
    def encode_dates_to_ints(self, x: np.ndarray) -> np.ndarray:
        """I want to convert the dates to intigers, which can help find
        the next invest day
        """
        return np.array([self.encode_date(date) for date in x])
    
    def decode_dates_from_ints(self, x: np.ndarray) -> np.ndarray:
        return np.array([self.decode_date(date) for date in x])
    
    def generate_day_ints(self, dates: np.ndarray, day: np.ndarray) -> np.ndarray:
        """Generate the day ints from the dates."""
        years = np.unique([date.year for date in dates])
        months = np.unique([date.month for date in dates])

        for year in years:
            for month in months:
                if (year >= 2024) and (month >= 10) and (day < 4):
                    break
                yield year * 10000 + month * 100 + day

    def get_indices(self):
        encoded_dates = self.encode_dates_to_ints(self.dates)
        selected_day_ints = list(self.generate_day_ints(self.dates, self.day_to_invest))
        indices = self.find_next_trading_day(encoded_dates, selected_day_ints)
        return indices


        
    

In [302]:
%%timeit
saving_plan = SavingPlan(df, 10, 1, "max")
saving_plan.total_worth

8.68 ms ± 74.9 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [244]:
list(generate_day_ints(dates, day))[-3:]

[np.int64(20240701), np.int64(20240801), np.int64(20240901)]

In [245]:
encoded_dates = encode_dates_to_ints(dates)

# check whether the next value is always greater than the previous one
np.all(np.diff(encoded_dates) > 0)




np.True_

In [246]:
encoded_dates

array([19720103, 19720104, 19720105, ..., 20240829, 20240830, 20240903])

In [293]:
2000 * (10000) + 11 * (100) + 31

20001131

In [169]:
2001 * (10000) + (1) * 12 * (100) + 1

20011201

In [133]:
result = []
for all_days, target_day in zip(split_dates, days_to_invest):
    for day in all_days:
        if day.day == target_day:
            result.append(day)



In [67]:
split_dates

[array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([], dtype=int64),
 array([], dtype=int64),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([], dtype=int64),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([1]),
 array([1]),
 array([], dtype=int64),
 array([

In [62]:
days_to_invest

array([3, 1, 1, 3, 1, 1, 3, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 3, 1,
       1, 3, 2, 1, 1, 1, 1, 3, 1, 1, 2, 1, 1, 2, 2, 3, 3, 1, 1, 2, 1, 1,
       1, 1, 3, 1, 2, 2, 1, 1, 3, 1, 1, 2, 1, 1, 1, 1, 3, 1, 1, 1, 2, 1,
       1, 1, 1, 3, 1, 1, 2, 1, 1, 3, 1, 1, 3, 1, 1, 2, 1, 1, 2, 1, 1, 2,
       1, 1, 2, 1, 3, 1, 1, 3, 2, 1, 3, 1, 1, 2, 1, 1, 1, 1, 3, 1, 2, 2,
       2, 1, 1, 1, 1, 3, 1, 1, 2, 1, 4, 1, 1, 1, 3, 1, 1, 2, 1, 1, 1, 1,
       3, 1, 1, 1, 2, 1, 1, 1, 1, 3, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 3, 1,
       1, 3, 2, 1, 1, 1, 1, 3, 1, 1, 2, 1, 1, 2, 2, 3, 3, 1, 1, 2, 1, 1,
       1, 1, 3, 1, 2, 2, 2, 1, 1, 1, 1, 3, 1, 1, 2, 1, 4, 1, 1, 1, 2, 1,
       1, 1, 1, 3, 1, 1, 2, 1, 1, 3, 1, 1, 3, 1, 1, 2, 1, 1, 2, 1, 1, 2,
       1, 1, 2, 1, 3, 1, 1, 3, 2, 1, 1, 1, 1, 3, 1, 1, 2, 1, 1, 2, 2, 3,
       2, 1, 1, 1, 1, 3, 1, 1, 2, 1, 4, 1, 1, 1, 3, 1, 1, 2, 1, 1, 1, 1,
       3, 1, 1, 1, 2, 1, 1, 1, 1, 3, 1, 1, 3, 1, 1, 3, 1, 1, 3, 1, 1, 2,
       1, 1, 2, 1, 1, 1, 1, 3, 1, 1, 3, 1, 1, 2, 2,

In [54]:
find_next_trading_day(split_dates[0], 1)

TypeError: '<' not supported between instances of 'datetime.date' and 'int'

In [44]:
import numpy as np

# Assuming dates is a 1D array of dates
dates = np.array([...])  # Replace with your actual dates array

# Define the new shape, for example, reshape to 2D array with 3 columns
new_shape = (len(dates) // 3, 3)

# Reshape the dates array
reshaped_dates = dates.reshape(new_shape)

print(reshaped_dates)

ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (633,) + inhomogeneous part.

In [36]:
dates_by_month

{datetime.date(1972, 1, 3): array([datetime.date(1972, 1, 3)], dtype=object),
 datetime.date(1972, 1, 4): array([datetime.date(1972, 1, 4)], dtype=object),
 datetime.date(1972, 1, 5): array([datetime.date(1972, 1, 5)], dtype=object),
 datetime.date(1972, 1, 6): array([datetime.date(1972, 1, 6)], dtype=object),
 datetime.date(1972, 1, 7): array([datetime.date(1972, 1, 7)], dtype=object),
 datetime.date(1972, 1, 10): array([datetime.date(1972, 1, 10)], dtype=object),
 datetime.date(1972, 1, 11): array([datetime.date(1972, 1, 11)], dtype=object),
 datetime.date(1972, 1, 12): array([datetime.date(1972, 1, 12)], dtype=object),
 datetime.date(1972, 1, 13): array([datetime.date(1972, 1, 13)], dtype=object),
 datetime.date(1972, 1, 14): array([datetime.date(1972, 1, 14)], dtype=object),
 datetime.date(1972, 1, 17): array([datetime.date(1972, 1, 17)], dtype=object),
 datetime.date(1972, 1, 18): array([datetime.date(1972, 1, 18)], dtype=object),
 datetime.date(1972, 1, 19): array([datetime.date(