In [234]:
import yfinance as yf
import pandas as pd
import math
# import numpy_financial as npf
import statistics
import datetime
from dateutil.relativedelta import relativedelta
from pandas.core.frame import DataFrame
# from scipy.stats import gmean

In [235]:
# date preprocessing 為了計算設定的回測日期的第一個月報酬率，需有前一個月底的價格，所以起始日期要從起始的前一個月開始
def date_preprocessing(start, end):
    start_dt = datetime.datetime.strptime(start, "%Y-%m-%d")
    start_dt = (start_dt-relativedelta(months = +1)).strftime("%Y-%m-%d")
    end_dt = datetime.datetime.strptime(end, "%Y-%m-%d")
    end_dt = (end_dt + datetime.timedelta(days = 1)).strftime("%Y-%m-%d")
    return start_dt, end_dt

In [236]:
def choose_annually_monthly(time_period, df, start_dt, end_dt):
    stock_df = df.copy()
    if time_period==0:
        stock_m_df = stock_df.resample('BM').last()
    elif time_period==1:
        stock_m_df = stock_df.resample('Y').last()
    else :
        print('週期輸入錯誤')
    stock_m_df = stock_m_df.loc[start_dt:end_dt]
    return stock_m_df

In [237]:
# get stock price 取得各支股票每月月底調整後收盤價，並合併成一個dataframe
def get_stock_price(stock, start, end, time_period):
    date = date_preprocessing(start,end)
    start_dt = date[0]
    end_dt = date[1]
    stock_name = []
    stock_list = []
    for i in range(len(stock)):
    #     a=round(yf.download(stock[i],start=start_dt,end=end_dt),2)
        daily_price = yf.download(stock[i], start=start_dt, end=end_dt)
        stock_list.append(daily_price)

    for i in stock_list:
        i.drop(["Open"],axis = 1,inplace = True)
        i.drop(["High"],axis = 1,inplace = True)
        i.drop(["Low"],axis = 1,inplace = True)
        i.drop(["Close"],axis = 1,inplace = True)
        i.drop(["Volume"],axis = 1,inplace = True)

    if len(stock) == 1:
        price = stock_list[0].copy()
        price.rename(columns = {'Adj Close':stock[0]},inplace = True)
    else:
        price = stock_list[0].copy()
        price.rename(columns = {'Adj Close':stock[0]}, inplace = True)
        for i in range(1,len(stock)):
            price=pd.merge(price, stock_list[i], left_index = True, right_index = True,how = 'inner')
            price.rename(columns = {'Adj Close':stock[i]}, inplace=True)
    stock_m_df = choose_annually_monthly(time_period, price, start_dt, end_dt)       
    return stock_m_df

In [238]:
def total_cost(df):
    TotalCost_list = []
    for i in range(1,len(df)+1):
        TotalCost = PMT*i
        TotalCost_list.append(TotalCost)
    df['total_cost'] = TotalCost_list
    return df

In [239]:
# 各支股票股數計算
def calculate_stock_units(stock, df, PMT, ratio):
    unit_name_list = []

    for k in range(len(stock)):
        unit_list = [PMT*ratio[k]/df[df.columns[k]].iloc[0]]
        unit_name_list.append("unit"+str(k+1))
        for i in df[df.columns[k]].iloc[1:]:
            unit = unit_list[-1]+PMT*ratio[k]/i
            unit = unit
            unit_list.append(unit)
        df[unit_name_list[k]] = unit_list
    return df, unit_name_list

In [240]:
# 各支股票淨值計算
def pre_NAV(stock, df, PMT, ratio, unit_name_list):
    pre_NAV_name_list = []
    for k in range(len(stock)):
        pre_NAV_list = [0]
        pre_NAV_name_list.append("pre NAV"+str(k+1))
        for i,j in enumerate(df[df.columns[k]].iloc[1:]):
            pre_NAV = df[unit_name_list[k]].iloc[i]*j
            pre_NAV_list.append(pre_NAV)
        df[pre_NAV_name_list[k]] = pre_NAV_list
    return df, pre_NAV_name_list

In [241]:
# 計算每月累計損益 'final'
def final(stock, df, PMT, pre_NAV_name_list):
    df["final"] = df[pre_NAV_name_list[0]].copy()
    for i in range(1,len(stock)):
        df["final"] += df[pre_NAV_name_list[i]]
    for i in range(len(df)):
        df["final"][i] += PMT
    return df

In [242]:
# monthly_return

def monthly_return(df, PMT):
    m_return = df.copy()
    monthly_return = [(m_return["final"][0]/PMT)-1]
    for i in range(1,len(m_return)):
        mreturn = ((m_return["final"][i]-PMT)/m_return["final"][i-1])-1
        monthly_return.append(mreturn*100)

    df["monthly return(%)"] = monthly_return 
    return df

In [243]:
#每月月底變動比例買進
def allocation_drift(stock, df, PMT, unit_name_list, pre_NAV_name_list):
    stock_m_df_2 = df.copy()
    final_l = stock_m_df_2['final'].copy()
    for i in range(len(stock_m_df_2)):
        final_l[i] = PMT
    for i in range(1,len(stock_m_df_2)):
        for j in range(len(stock)):
            stock_m_df_2[pre_NAV_name_list[j]].iloc[i] = stock_m_df_2[unit_name_list[j]].iloc[i-1]*stock_m_df_2[stock[j]].iloc[i]
            final_l[i] += stock_m_df_2[pre_NAV_name_list[j]].iloc[i]
        stock_m_df_2['final'][i] = final_l[i]    
        for j in range(len(stock)):        
            stock_m_df_2[unit_name_list[j]].iloc[i] = (((stock_m_df_2[pre_NAV_name_list[j]].iloc[i]/(stock_m_df_2['final'].iloc[i]-PMT))*PMT)/stock_m_df_2[stock[j]].iloc[i])
            stock_m_df_2[unit_name_list[j]].iloc[i] += stock_m_df_2[unit_name_list[j]].iloc[i-1]
    stock_m_df_2 = monthly_return(stock_m_df_2, PMT)  
    return stock_m_df_2
     
    

In [244]:
   # profit
def asset_profit(stock, df, PMT, ratio):
    # 每月固定比例買進-----[start]
    stock_df = df.copy()
    # 計算total cost
    stock_df_cost = total_cost(stock_df)

    # 各支股票股數計算
    stock_units = calculate_stock_units(stock, stock_df_cost, PMT,ratio)
    stock_df_unit = stock_units[0]
    unit_name_list = stock_units[1]
    
    # 各支股票淨值計算
    stock_pre_NAV = pre_NAV(stock, stock_df_unit, PMT, ratio, unit_name_list)
    stock_df_pre_NAV = stock_pre_NAV[0]
    pre_NAV_name_list = stock_pre_NAV[1]

    # 計算每月累計損益 'final'
    stock_df_final = final(stock, stock_df_pre_NAV, PMT, pre_NAV_name_list)
    
    # monthly_return
    stock_df_fixed = monthly_return(stock_df_final, PMT)          
    # 每月固定比例買進-----[end]
    
    #每月變動比例買進-----[start]
    stock_df_drift = allocation_drift(stock, stock_df_fixed, PMT, unit_name_list, pre_NAV_name_list)     
    #每月變動比例買進-----[end]      
    
    return stock_df_fixed,stock_df_drift

In [245]:
# CAGR
def CAGR(df, time_period):
    if time_period == 0:
        cagr=round(((df['final'][-1]/df['total_cost'][0])**(1/((len(df)-1)/12))-1)*100,2)
    elif time_period == 1:
        cagr=round(((df['final'][-1]/df['total_cost'][0])**(1/((len(df)-1)))-1)*100,2)
    return cagr

In [246]:
# # profit
# profit = asset_profit(stock,price,PMT,ratio)
# asset_profit_fixed = profit[0]
# asset_profit_drift = profit[1]
# # asset_profit_drift

In [266]:
# risk :stdev,Sharpe,Sortino 用月報酬計算風險指標
def risk(df, time_period):
    if time_period == 0:
        df_stdev = df.copy()
        risk = df_stdev["monthly return(%)"][1:].std()
        risk_Y = risk*12**0.5
        risk_Y = round(risk_Y,2)
        ar = df_stdev["monthly return(%)"][1:].mean()*12
        std = risk_Y

        # Sharpe
        sp = ar/std
        sp = round(sp,2)

        # sortino
        sortino = []
        for i in range(1,len(df_stdev)):
            if df_stdev["monthly return(%)"][i] < 0:
                sortino.append(df_stdev["monthly return(%)"][i])
            else:
                sortino.append(0)

        std_D = DataFrame(sortino).std()*12**0.5
        st = ar/std_D
        st = round(st,3)
    elif time_period == 1:
        df_stdev = df.copy()
        risk = df_stdev["monthly return(%)"][1:].std()
        risk = round(risk,2)
        ar = df_stdev["monthly return(%)"][1:].mean()
        std = risk

        # Sharpe
        sp = ar/std
        sp = round(sp,2)

        # sortino
        sortino = []
        for i in range(1,len(df_stdev)):
            if df_stdev["monthly return(%)"][i] < 0:
                sortino.append(df_stdev["monthly return(%)"][i])
            else:
                sortino.append(0)

        std_D = DataFrame(sortino).std()
        st = ar/std_D
        st = round(st,3)
    return std, sp, st[0]

In [248]:
# MDD input:monthly return
def MDD(df):
    df = (df["monthly return(%)"][0:]/100).copy()
    df = df.to_frame()
#     dr=df.pct_change(1)
    r = df.add(1).cumprod()
    dd = r.div(r.cummax()).sub(1)
    mdd = dd.min()
    end = dd.idxmin()
    start = r.loc[:end[0]].idxmax()
    days = end-start
    return round(mdd[0]*100,2), start[0], end[0], days[0]

In [249]:
# print(risk(asset_profit_fixed))
# print(risk(asset_profit_drift))
# print(CAGR(asset_profit_fixed))
# print(CAGR(asset_profit_drift))
# print(MDD(asset_profit_fixed))
# print(MDD(asset_profit_drift))

# no contribution

In [250]:
def no_contribution_cost(df):
    TotalCost_list=[]
    for i in range(1,len(df)+1):
        TotalCost = PMT
        TotalCost_list.append(TotalCost)
    df['total_cost'] = TotalCost_list
    return df

In [251]:
# 各支股票股數計算
def no_contribution_cal_units(stock, df, PMT, ratio):
    unit_name_list = []
    for k in range(len(stock)):
        unit_list = []
        unit_name_list.append("unit"+str(k+1))
        unit_list = [PMT*ratio[k]/df[df.columns[k]].iloc[0]]
        for i in df[df.columns[k]].iloc[1:]:
            unit = unit_list[-1]
            unit = unit
            unit_list.append(unit)
        df[unit_name_list[k]] = unit_list
    return df, unit_name_list

In [252]:
# 各支股票淨值計算
def no_contribution_pre_NAV(stock, df, PMT, ratio, unit_name_list):
    pre_NAV_name_list = []
    for k in range(len(stock)):
        pre_NAV_list = [0]
        pre_NAV_name_list.append("pre NAV"+str(k+1))
        for i,j in enumerate(df[df.columns[k]].iloc[1:]):
            pre_NAV = df[unit_name_list[k]].iloc[i]*j
            pre_NAV_list.append(pre_NAV)
        df[pre_NAV_name_list[k]] = pre_NAV_list
    return df, pre_NAV_name_list

In [253]:
def no_contribution_final(stock, df, PMT, pre_NAV_name_list):
    df["final"] = df[pre_NAV_name_list[0]].copy()
    df["final"][0] = PMT
    for i in range(1,len(stock)):
        df["final"] += df[pre_NAV_name_list[i]]
    return df

In [254]:
# monthly_return
def no_contribution_monthly_return(df, PMT):
    m_return=df.copy()
    monthly_return=[(m_return["final"][0]/PMT)-1]
    for i in range(1,len(m_return)):
        mreturn=((m_return["final"][i])/m_return["final"][i-1])-1
        monthly_return.append(mreturn*100)

    df["monthly return(%)"]=monthly_return
    return df

In [255]:
# no_contribution_asset_profit
def no_contribution_asset_profit(stock, stock_df, PMT, ratio):
    # 每月固定比例買進-----[start]
    
    # 計算total cost
    no_contribution_stock_df_cost = no_contribution_cost(stock_df)

    # 各支股票股數計算
    no_contribution_stock_units = no_contribution_cal_units(stock, no_contribution_stock_df_cost, PMT,ratio)
    no_contribution_stock_df_unit = no_contribution_stock_units[0]
    no_contribution_unit_name_list = no_contribution_stock_units[1]
    
    # 各支股票淨值計算
    no_contribution_stock_pre_NAV = no_contribution_pre_NAV(stock, no_contribution_stock_df_unit, PMT, ratio, no_contribution_unit_name_list)
    no_contribution_stock_df_pre_NAV = no_contribution_stock_pre_NAV[0]
    no_contribution_pre_NAV_name_list = no_contribution_stock_pre_NAV[1]
    
    # 計算每月累計損益 'final'
    no_contribution_stock_df_final = no_contribution_final(stock, no_contribution_stock_df_pre_NAV, PMT, no_contribution_pre_NAV_name_list)
    
    # monthly_return
    no_contribution_stock_df = no_contribution_monthly_return(no_contribution_stock_df_final, PMT)          
    # 每月固定比例買進-----[end]      
    
    return no_contribution_stock_df

In [256]:
# no_contribution_profit=no_contribution_asset_profit(stock, price, PMT, ratio)
# no_contribution_profit
# # price

In [257]:
# print(risk(no_contribution_profit))

# print(CAGR(no_contribution_profit))

# print(MDD(no_contribution_profit))


In [270]:
def result(df, stock, time_period):
    profit_final = round(df['final'][-1])
    profit_CAGR = CAGR(df, time_period) 
    RISK = risk(df, time_period)
    Stdev = RISK[0]
    Sharpe = RISK[1]
    Sortino = RISK[2]
    profit_MDD = MDD(df)[0]
    MDD_date_range=str(MDD(df)[1].strftime('%Y-%m-%d')) +'~'+ str(MDD(df)[2].strftime('%Y-%m-%d'))
    if len(stock)==1:
        d = pd.DataFrame({'總損益':profit_final, 'CAGR(%)':profit_CAGR, 'Stdev(%)':Stdev, 'Sharpe':Sharpe, 'Sortino':Sortino, 'MDD(%)':profit_MDD}, index = [stock[0]])
    else:
        d = pd.DataFrame({'總損益':profit_final,'CAGR(%)':profit_CAGR, 'Stdev(%)':Stdev, 'Sharpe':Sharpe, 'Sortino':Sortino, 'MDD(%)':profit_MDD}, index = ['Portfolio'])
    return print(d)

In [274]:
# input
start = '2020-1-1'
end = '2022-12-31'
ratio = [0.5,0.25,0.25]
stock = ['GOOGL','TSLA','AMZN']
contribution_amount = 10000
cashflows = 'Contribute fixed amount'
contribution_ratio_type = 'Flexible'
time_period = 1

# cashflows:'None'= 0 ,'Contribute fixed amount' = 1
# contribution_ratio_type:'Fixed' = 0, 'Flexible' = 1
PMT = contribution_amount

In [276]:
# def output(stock,start,end,PMT, ratio)
price = get_stock_price(stock,start,end,time_period)


if cashflows == 'None':
    no_contribution_profit=no_contribution_asset_profit(stock, price, PMT, ratio)
    print("=================================================================") 
    print('現金流 : 單筆投入')
    print('投入配比類別 : 無')
    print("標的 :",stock)
    print("回測區間 :",start,'~',end)
    print("=================================================================") 
    result(no_contribution_profit, stock, time_period)

elif cashflows == 'Contribute fixed amount':
    profit = asset_profit(stock,price,PMT,ratio)
    if contribution_ratio_type == 'Fixed':
        print("=================================================================") 
        print('現金流 : 定期定額')
        print('投入配比類別 : 固定')
        print("標的 :",stock)
        print("回測區間 :",start,'~',end)
        print("=================================================================") 
        asset_profit_fixed = profit[0]
        print(asset_profit_fixed)
        result(asset_profit_fixed, stock, time_period)

    elif contribution_ratio_type == 'Flexible':
        print("=================================================================") 
        print('現金流 : 定期定額')
        print('投入配比類別 : 變動')
        print("標的 :",stock)
        print("回測區間 :",start,'~',end)
        print("=================================================================") 
        asset_profit_drift = profit[1]
        print(asset_profit_drift)
        result(asset_profit_drift, stock, time_period)
    else:
        print('contribution_ratio_type輸入有誤')

else:
    print('cashflows輸入有誤')

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
現金流 : 定期定額
投入配比類別 : 變動
標的 : ['GOOGL', 'TSLA', 'AMZN']
回測區間 : 2020-1-1 ~ 2022-12-31
                 GOOGL        TSLA        AMZN  total_cost       unit1  \
Date                                                                     
2019-12-31   66.969498   27.888666   92.391998       10000   74.660856   
2020-12-31   87.632004  235.223328  162.846497       20000   97.966877   
2021-12-31  144.852005  352.260010  166.716995       30000  113.884933   
2022-12-31   88.230003  123.180000   84.000000       40000  151.398480   

                 unit2      unit3      pre NAV1      pre NAV2     pre NAV3  \
Date                                                                         
2019-12-31   89.642150  27.058620      0.000000      0.000000     0.000000   
2020-12-31  117.624710  35.5051

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["final"][i] += PMT
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stock_m_df_2[pre_NAV_name_list[j]].iloc[i] = stock_m_df_2[unit_name_list[j]].iloc[i-1]*stock_m_df_2[stock[j]].iloc[i]
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stock_m_df_2['final'][i] = final_l[i]
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus

In [None]:
asas='[1,2,3,4]'
asas1=[]
asas=list(asas)
asas.remove('[')
asas.remove(']')
for i in range(asas.count(',')):
    asas.remove(',')
for j in range(len(asas)):
    a = int(asas[j])
    asas1.append(a)
print(type(asas1[1]))