# Package

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import matplotlib
import seaborn as sns
import textwrap
import swifter
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()

%matplotlib inline

# Data

In [None]:
def price_date_transform(CSV_date,index=False):
    '''
    Transform the CSV price style string into dateframe string style
    The CSV date follows US style which is MM/DD/YYYY
    '''
    if index==False:
        timestamp=pd.Timestamp(int(CSV_date[CSV_date.find("/",3)+1:]),
                            int(CSV_date[:CSV_date.find("/")]),
                            int(CSV_date[CSV_date.find("/",1)+1:CSV_date.find("/",3)]))
        return timestamp.strftime("%d/%b/%Y")
    else:
        timestamp=pd.Timestamp(int(CSV_date[-4:]),int(CSV_date[3:5]),int(CSV_date[:2]))
        return timestamp

In [None]:
def fundamental_date_transform(CSV_date):
    '''
    Transform the fundamental style string into dateframe string style
    CSV date follow following style yyyy-mm-dd or MM/DD/YYYY
    '''
    if '-' in CSV_date:
        timestamp=pd.Timestamp(int(CSV_date[:4]),
                            int(CSV_date[5:7]),
                            int(CSV_date[8:]))
    else:
        timestamp=pd.Timestamp(int(CSV_date[CSV_date.find("/",3)+1:]),
                               int(CSV_date[CSV_date.find("/",1)+1:CSV_date.find("/",3)]),
                               int(CSV_date[:CSV_date.find("/")]))        
    return timestamp

In [4]:
def CSV_price(region,start,end,VWAP):
    '''
    Grab the pricing data from CSV
    region= US, Europe,Asia,or Canada
    start,end are in year
    VWAP: boolean to determine if we read price_df or VWAP_df
    key is the reference to search
    return the target price dataframe with timestamp on the column; also index price and return dataframe
    '''
    
    # price dataframe
    mylist=[]
    for year in range(start,end+1):
        if VWAP==False:
            csv=pd.read_csv(r"C:\Users\Eric.Li\OneDrive\Post result data\{0} CSV\{0}_price_{1}.csv".format(region,year)).dropna\
    (how='all',axis=0).dropna(how='all',axis=1)
        else:
            csv=pd.read_csv(r"C:\Users\Eric.Li\OneDrive\Post result data\{0} CSV\{0}_VWAP_{1}.csv".format(region,year)).dropna\
    (how='all',axis=0).dropna(how='all',axis=1)
        data=csv.set_index("Ticker")
        adj_data=data.loc[[x for x in data.index if type(x)==str]].replace('#N/A N/A','').replace(' #N/A N/A ','').\
        replace('#N/A Invalid Security','')
        adj_data=adj_data.loc[[x for x in adj_data.index if len(x)>0]]
        mylist.append(adj_data)

    price=pd.concat(mylist,axis=1,sort=True)
    price=price.apply(lambda x:pd.to_numeric(x),axis=1)
    price.columns=[price_date_transform(i) for i in price.columns]
    
    # index price dataframe
    csv_index=pd.read_csv(r"C:\Users\Eric.Li\OneDrive\Post result data\{0} CSV\{0}_price_index.csv".format(region)).dropna\
    (how='all',axis=0)
    data_index=csv_index.set_index("Ticker").T
    price_index=data_index.replace('#N/A N/A','')
    price_index.columns=[price_date_transform(i) for i in price_index.columns]
    
    # return data
    abs_return=price.diff(1,axis=1)/price.shift(1,axis=1)
    abs_return_index=price_index.diff(1,axis=1)/price_index.shift(1,axis=1)
    return price,abs_return,price_index,abs_return_index

In [None]:
def CSV_EPS(region,start,end):
    '''
    Grab the EPS data from CSV database
    region= US, Europe,Asia,or Canada
    start,end are in year
    key is the reference to search
    return the target EPS dataframe with timestamp on the column
    '''
    mylist=[]
    for year in range(start,end+1):
        csv=pd.read_csv(r"C:\Users\Eric.Li\OneDrive\Post result data\{0} CSV\{0}_EPS_{1}.csv".format(region,year))
        data=csv.set_index("Ticker")
        adj_data=data.loc[[x for x in data.index if type(x)==str]].replace('#N/A N/A','').replace(" #N/A N/A ","").dropna\
        (how='all',axis=0).dropna(how='all',axis=1)
        adj_data=adj_data.loc[[x for x in adj_data.index if len(x)>0]]
        mylist.append(adj_data)

    EPS=pd.concat(mylist,axis=1,sort=True)
    EPS=EPS.apply(lambda x:pd.to_numeric(x),axis=1)
    EPS.columns=[price_date_transform(i) for i in EPS.columns]
    return EPS

In [14]:
def CSV_fundamentals(region,price,EPS_df,revision_period,min_history,min_vol,use_cache):
    '''
    Grab the fundamental data from the spreadsheet
    region= US, Europe,Asia,or Canada
    return the post result fundamental dataframe
    use_cache: boolean, if yes we just read the last cache of fundamental_df
    
    '''
    
    if use_cache is True:
        try:
            data=pd.read_csv(r'C:\Users\Eric.Li\OneDrive\Post result data cache\{0}{1}_fundamental_df.csv'.format(region,\
                                                                                                        str(revision_period)))
            new_index=pd.MultiIndex.from_tuples(list(zip(data.iloc[:,0],data.iloc[:,1],data.iloc[:,2],data.iloc[:,3])))
            data.index=new_index
            target_data=data.iloc[:,4:]
        except:
            print("No such file!")
    else:
        # import the raw fundamental_df and clean up all the nonsense
        csv=pd.read_csv(r'C:\Users\Eric.Li\OneDrive\Post result data cache\{0}_raw_fundamental_df.csv'.format(region))
        data=csv.set_index("Ticker").drop_duplicates().replace('#N/A Invalid Security','').\
        replace('#N/A Requesting Data...','')
        data=data[data.index!='']
        data=data.dropna(how="all")

        # Manipulate the data to get the next earning date, quarter end date, finally generate multi-index for the dataframe 
        data["date_copy"]=[fundamental_date_transform(i) for i in data["Date"].copy()]
        data["ticker_copy"]=data.index
        data=data.copy().sort_values(by=["ticker_copy","date_copy"])
        data["next_date"]=data["date_copy"].shift(-1)
        data["ticker_copy"]=data["ticker_copy"].shift(-1)
        print(datetime.now())
        data["Date"]=data["date_copy"].copy().swifter.apply(lambda x: x.strftime("%d/%b/%Y") if x!='' else np.nan)
        print(datetime.now())
        data["Orig date"]=data["Orig date"].copy().swifter.apply(lambda x: pd.Timestamp(x).strftime("%d/%b/%Y")\
                                                          if x!='' else np.nan)
        print(datetime.now())
        data["Next"]=data.swifter.apply(lambda x: x["next_date"].strftime("%d/%b/%Y") \
                                        if type(x["next_date"])==pd.Timestamp and \
                                x.name==x["ticker_copy"] else np.nan,axis=1)
        print(datetime.now())
        
#         data["period"]=data.apply(lambda x:str(pd.Timestamp(datetime.strptime(x["Date"],"%d/%b/%Y")).year)\
#                                             +" "+str(pd.Timestamp(datetime.strptime(x["Date"],"%d/%b/%Y")).quarter),\
#                                             axis=1)

        data["end_period"]=data.swifter.apply(lambda x: pd.offsets.BQuarterEnd().rollforward(x["date_copy"])\
                                              .strftime("%d/%b/%Y"),\
                                      axis=1)

        data.index=pd.MultiIndex.from_tuples(list(zip(data.index,data["Date"],data["Next"],data["end_period"])))

        del data["ticker_copy"]
        del data["date_copy"]
        del data["next_date"]
        del data["end_period"]
        del data["Next"]

        for s in ["Market cap","Volume"]:
            try:
                data[s]=pd.to_numeric(data[s])
            except KeyError:
                pass

        '''
        Add more forward look and realistic versions of earning revision
        '''
        print(datetime.now())
        data["Revision_real"]=data.swifter.apply(lambda x: revision_calc(x.name[0],x.name[1],EPS_df,(0,revision_period)),axis=1)
        print(datetime.now())
        data["Revision_20"]=data.swifter.apply(lambda x: revision_calc(x.name[0],x.name[1],EPS_df,(0,20)),axis=1)
        print(datetime.now())

        '''
        take out data with zero or none revision/market cap
        '''
        data=data[(data["Market cap"]>=500)] #universe above 500mn
        data=data[(data["Revision_20"]>=0)|(data["Revision_20"]<0)]
        
        '''
        take out cases where there is a short history
        '''
        count_history=data.swifter.apply(lambda x: price.loc[x.name[0],:x.name[1]][-2*min_history:].count() if x.name[1] in \
                                   price.columns and x.name[0] in price.index else None,axis=1)
        
        data=data.copy()[count_history>=min_history]
         

        '''
        Add historic volatility
        '''
        
        abs_return=price.diff(1,axis=1)/price.shift(1,axis=1)
        print(datetime.now())
        data["30d_vol"]=data.swifter.apply(lambda x: abs_return.loc[x.name[0],:x.name[1]][-31:-1].std() \
                                           if x.name[0] in abs_return.index\
                                   and abs_return.loc[x.name[0],:x.name[1]][-31:-1].dropna().shape[0]!=0 else None,axis=1)      
        print(datetime.now())
        data=data[data["30d_vol"]>=min_vol]
        
        '''
        Final cleaning and export the data
        '''
        target_data=data.drop_duplicates()
        target_data.to_csv(r'C:\Users\Eric.Li\OneDrive\Post result data cache\{0}{1}_fundamental_df.csv'.format(region,\
                                                                                                      str(revision_period)))
    return target_data

# Util function

In [9]:
def revision_calc(ticker,date,EPS_df,period):
    '''
    Calculate revision from ticker and reference date
    EPS_df: EPS dataframe with all the historical data
    '''
    if type(date)==pd.NaT:
        return None
    elif type(date)==pd.Timestamp:
        date=date.strftime("%d/%b/%Y")
    elif type(date)==str:
        date=date
    
    if ticker in EPS_df.index:
        eps_series=EPS_df.loc[ticker]
        date_series=eps_series.index.tolist()
        if date in date_series:
            day0=date_series.index(date)
            post_series=eps_series.iloc[day0+period-1:day0+period+10]
            pre_series=eps_series.iloc[day0-10:day0]
            if len(post_series.dropna())==0 or len(pre_series.dropna())==0 or pre_series.dropna().iloc[-1]==0:
                revision=None
            else:
                try:
                    revision=np.divide(post_series.dropna().iloc[0],pre_series.dropna().iloc[-1])-1
                except:
                    revision=None
            return revision
        else:
            return None
    else:
        return None

In [None]:
def revision_calc(ticker,date,EPS_df,period_tuple):
    '''
    Calculate percentage revision from the period tuple
    Day starts from zero, so 1 means start the return calculation one day after result, second element is the number of days
    return calculation assumes enters on the price of the prior day to day when the count starts
    '''
    if type(date)==float:
        return None
    elif type(date)==pd.Timestamp:
        date=date.strftime("%d/%b/%Y")
    elif type(date)==str:
        date=date
    
    eps_series=EPS_df.loc[ticker].dropna()
    date_series=eps_series.index.tolist()
    
    if date in date_series:
        day0=date_series.index(date)

        if period_tuple[0]<0 and len(eps_series.loc[:date].dropna())-1<=abs(period_tuple[0]):
            start=eps_series.dropna().iloc[0]
        else:
            start=eps_series.iloc[:day0+period_tuple[0]].dropna().iloc[-1]        
        
        end=eps_series.iloc[:day0+period_tuple[0]+period_tuple[1]].dropna().iloc[-1]
                
        if start!=0:
            revision=(end-start)/abs(start)
        else:
            revision=None
        
        target_revision=revision
        return target_revision
    else:
        return None

In [40]:
def return_calc(ticker,date,price_df,index_df,period_tuple,abs_rel):
    '''
    Calculate percentage return from the period tuple
    abs_rel: bool, abs_return if assigned abs
    Day starts from zero, so 1 means start the return calculation one day after result, second element is the number of days
    return calculation assumes enters on the close price of the prior day to day when the count starts
    '''
    if type(date)==float:
        return None
    elif type(date)==pd.Timestamp:
        date=date.strftime("%d/%b/%Y")
    elif type(date)==str:
        date=date
    price_series=price_df.loc[ticker].dropna()
    date_series=price_series.index.tolist()
    
    index_data_series=index_df.index.tolist()
    
    if date in date_series:
        day0=date_series.index(date)
        day0_index=index_data_series.index(date)

        if period_tuple[0]<0 and len(price_series.loc[:date].dropna())-1<=abs(period_tuple[0]):
            start_price=price_series.dropna().iloc[0]
        else:
            start_price=price_series.iloc[:day0+period_tuple[0]].dropna().iloc[-1]        
        end_price=price_series.iloc[:day0+period_tuple[0]+period_tuple[1]].dropna().iloc[-1]
        
        #target_series=price_series.iloc[day0+period_tuple[0]-2:day0+period_tuple[0]+period_tuple[1]]
        
        if start_price!=0:
            abs_return=end_price/start_price-1
        else:
            abs_return=None
        
        if abs_rel=='abs':
            target_return=abs_return
        else:

            if period_tuple[0]<0 and len(index_df.loc[:date].dropna())-1<=abs(period_tuple[0]):
                start_index=index_df.dropna().iloc[0]
            else:
                start_index=index_df.iloc[:day0_index+period_tuple[0]].dropna().iloc[-1]
                end_index=index_df.iloc[:day0_index+period_tuple[0]+period_tuple[1]].dropna().iloc[-1]
                
            if start_index!=0:
                index_return=end_index/start_index-1
            else:
                index_return=None

            if abs_return is None or index_return is None:
                target_return=None
            else:
                target_return=abs_return-index_return
        return target_return
    else:
        return None

In [41]:
def return_calc_log(ticker,date,price_df,index_df,period_tuple,abs_rel):
    '''
    Calculate log return from the period tuple
    abs_rel: bool, abs_return if assigned abs
    Day starts from zero, so 1 means start the return calculation one day after result, second element is the number of days
    return calculation assumes enters on the close price of the prior day to day when the count starts
    '''
    if type(date)==float:
        return None
    elif type(date)==pd.Timestamp:
        date=date.strftime("%d/%b/%Y")
    elif type(date)==str:
        date=date
    price_series=np.log(price_df.loc[ticker].dropna())
    date_series=price_series.index.tolist()
    
    index_data_series=index_df.index.tolist()
    
    if date in date_series:
        day0=date_series.index(date)
        day0_index=index_data_series.index(date)
        
        if period_tuple[0]<0 and len(price_series.loc[:date].dropna())-1<=abs(period_tuple[0]):
            start_price=price_series.dropna().iloc[0]
        else:
            start_price=price_series.iloc[:day0+period_tuple[0]].dropna().iloc[-1]
        end_price=price_series.iloc[:day0+period_tuple[0]+period_tuple[1]].dropna().iloc[-1]
        
        #target_series=price_series.iloc[day0+period_tuple[0]-2:day0+period_tuple[0]+period_tuple[1]]
        
        if start_price!=0:
            abs_return=end_price-start_price
        else:
            abs_return=None
        

        
        if abs_rel=='abs':
            target_return=abs_return
        else:

            if period_tuple[0]<0 and len(index_df.loc[:date].dropna())-1<=abs(period_tuple[0]):
                start_index=np.log(index_df).dropna().iloc[0]
            else:
                start_index=np.log(index_df).iloc[:day0_index+period_tuple[0]].dropna().iloc[-1]
                end_index=np.log(index_df).iloc[:day0_index+period_tuple[0]+period_tuple[1]].dropna().iloc[-1]
                if start_index!=0:
                    index_return=end_index-start_index
                else:
                    index_return=None

            if abs_return is None or index_return is None:
                target_return=None
            else:
                target_return=abs_return-index_return
        return target_return
    else:
        return None

In [None]:
def quarter_transform(quarter):
    '''
    Take the raw quarter to Q1 to Q4
    '''
    if type(quarter)==float:
        adj_quarter=None
    else:
        
        if quarter[-2:]=='Q4' or quarter[-2:]==':A':
            adj_quarter='Q4'
        elif quarter[-2:]=='Q3' or quarter[-2:]=='C3':
            adj_quarter='Q3'
        elif quarter[-2:]=='Q2' or quarter[-2:]=='C2' or quarter[-2:]=='S1':
            adj_quarter='Q2'
        elif quarter[-2:]=='Q1' or quarter[-2:]=='C1':
            adj_quarter='Q1'
        else:
            adj_quarter=None
    return adj_quarter

In [None]:
def calc_beta(ticker,date,return_df,index_df,length):
    '''
    calculate beta for individual stocks
    '''
    cov_matrix=np.cov(return_df.loc[ticker,:date].iloc[-length-1:-1],index_df.loc[:date].iloc[-length-1:-1])
    
    beta=cov_matrix[0][1]/cov_matrix[1][1]
    return beta

# Signal functions

In [None]:
def signal_vol(signal_column,return_df,vol_lookback):
    '''
    Calculate simple vol from signal tuple
    '''
    signal_series=return_df.loc[signal_column.name[0]]
    location=signal_series.index.tolist().index(signal_column.name[1])
    vol_range=min(vol_lookback,len(signal_series[:location]))
    signal_vol=signal_series[location-vol_range-1:location].std()
    return signal_vol

In [None]:
def index_vol(date,index_df,vol_lookback):
    '''
    Calculate simple vol from signal tuple
    '''
    location=index_df.index.tolist().index(date)
    vol_range=min(vol_lookback,len(index_df.iloc[:location]))
    signal_vol=index_df.iloc[location-vol_range-1:location].std()
    return signal_vol

In [None]:
def slice_universe(signal_df,start_datetime,end_datetime,old_position):
    '''
    Slice the signal_df, both the index and entry date have to be 
    '''
    

    signal_df=signal_df.loc[start_datetime:end_datetime]
    
    if old_position is True:  
        adj_signal_df=signal_df
    else:
        entry=signal_df.apply(lambda x:datetime.strptime(x.name[1],"%d/%b/%Y"),axis=0)
        period_evaluate=(entry>=start_datetime)&(entry<=end_datetime)
        adj_signal_df=signal_df.loc[:,period_evaluate]
    
    
    zero_index=pd.Series(1,index=pd.date_range(start_datetime,end_datetime,freq='B')).to_frame()
    adj_signal_df=pd.concat([adj_signal_df.drop_duplicates(),zero_index],axis=1).iloc[:,:-1]
    return adj_signal_df
    

In [None]:
def signal_filter_stop(signal_df,stop_level,return_df,vol_lookback,stop_type,index_df):
    '''
    Input - signal_df
    Get the updated signal df after the stop loss
    stop_type:abs,rel
    
    '''
    if stop_type=='abs':
        vol_row=signal_df.apply(lambda column:signal_vol(column,return_df,vol_lookback),axis=0)
        signal_cum_nmove=((1+signal_df).cumprod()-1).ffill()/vol_row
        signal_df_stop=signal_df[-(signal_cum_nmove.expanding().min().shift(1,axis=0)<-stop_level)]
    elif stop_type=='rel':
        if index_df.shape[1]==1:
            signal_count=signal_df.copy()
            signal_count[((signal_count)>0) | ((signal_count)<0)]=1.0
            signal_hedge=signal_count.apply(lambda x:x.multiply(index_df.iloc[:,0],axis=0))
            
            vol_row=signal_df.apply(lambda x:signal_vol(x,return_df,vol_lookback),axis=0)
            rel_signal_cum_nmove=((1+signal_df).cumprod()-(1+signal_hedge).cumprod()).ffill()/vol_row
            signal_df_stop=signal_df[-(rel_signal_cum_nmove.expanding().min().shift(1,axis=0)<-stop_level)]
        else:
            signal_count=signal_df.copy()
            signal_count[((signal_count)>0) | ((signal_count)<0)]=1.0
            signal_hedge=signal_count.apply(lambda x:x.multiply(index_df[Asia_mapping.loc[x.name[0][-2:]].iloc[0]],axis=0))
            
            vol_row=signal_df.apply(lambda x:signal_vol(x,return_df,vol_lookback),axis=0)
            rel_signal_cum_nmove=((1+signal_df).cumprod()-(1+signal_hedge).cumprod()).ffill()/vol_row
            signal_df_stop=signal_df[-(rel_signal_cum_nmove.expanding().min().shift(1,axis=0)<-stop_level)]            
            
    else:
        pass
        
    return signal_df_stop

In [None]:
def revision_adjusted_size(reference_signal_df,lower_revision,higher_revision,size_multiple,revision_row,revision_row_reference,\
                           gross,long):
    
    ''' 
    Use positive size
    '''
    
    lower_size=0.01
    higher_size=lower_size*size_multiple

    if long is True:
        size_row_reference=revision_row_reference.to_frame().copy().apply(lambda x: lower_size+(higher_size-lower_size)\
                                                      *(np.abs(x.iloc[0]-lower_revision))/np.abs(higher_revision-lower_revision) \
                                                      if np.abs(x.iloc[0])<=np.abs(higher_revision) else higher_size,axis=1)

        size_df_reference=(1+reference_signal_df).cumprod()*size_row_reference

        trial_gross=np.abs(size_df_reference.sum(axis=1).mean())
        new_lower_size=lower_size/(trial_gross*100/gross)
        new_higher_size=higher_size/(trial_gross*100/gross)

        size_row=revision_row.to_frame().copy().apply(lambda x: new_lower_size+(new_higher_size-new_lower_size)\
                                                      *(np.abs(x.iloc[0]-lower_revision))/np.abs(higher_revision-lower_revision) \
                                                      if np.abs(x.iloc[0])<=np.abs(higher_revision) else new_higher_size,axis=1)
    else:
        size_row_reference=revision_row_reference.to_frame().copy().apply(lambda x: lower_size+(higher_size-lower_size)\
                                                      *(np.abs(x.iloc[0]-lower_revision))/np.abs(higher_revision-lower_revision) \
                                                      if np.abs(x.iloc[0])<=np.abs(lower_revision) else higher_size,axis=1)

        size_df_reference=(1-reference_signal_df).cumprod()*size_row_reference

        trial_gross=np.abs(size_df_reference.sum(axis=1).mean())
        new_lower_size=lower_size/(trial_gross*100/gross)
        new_higher_size=higher_size/(trial_gross*100/gross)

        size_row=revision_row.to_frame().copy().apply(lambda x: new_lower_size+(new_higher_size-new_lower_size)\
                                                      *(np.abs(x.iloc[0]-lower_revision))/np.abs(higher_revision-lower_revision) \
                                                      if np.abs(x.iloc[0])<=np.abs(lower_revision) else new_higher_size,axis=1)

    return size_row, new_lower_size

In [None]:
def sizing(signal_df,reference_signal_df,gross,fundamental_df,new_signal,return_df,risk_parity,liquidity,capital,\
           revision_adjust,long):
    '''
    Use historical signal_df range to calculate the size row for the current signal_df range
    Idea is to use historical as a benchmark for future sizing
    '''
    
    fundamental_df=fundamental_df.copy().sort_index()
    vol_reference=reference_signal_df.apply(lambda x:signal_vol(x,return_df,30),axis=0).mean()
    vol_row=signal_df.apply(lambda x:signal_vol(x,return_df,30),axis=0)
    
    '''
    Revision row needs to be updated using reference_signal
    new sizing scheme is a linear function
    revision_adjust=(True/False,lower_revision,higher_revision,size_multiple)
    '''
    
    
    if revision_adjust[0] is True:
        
        if new_signal is True:     
            revision_row_reference=fundamental_df.loc[reference_signal_df.columns]["Revision_real"]
            revision_row=fundamental_df.loc[signal_df.columns]["Revision_real"]
        else:
            revision_row_reference=fundamental_df.loc[reference_signal_df.columns]["Revision_20"]
            revision_row=fundamental_df.loc[signal_df.columns]["Revision_20"]           
        
        if long is True and revision_adjust[1] is not None:
            lower_revision=revision_adjust[1][0]
            higher_revision=revision_adjust[1][1]
            size_multiple=revision_adjust[3]
        
            base_size,low_size=revision_adjusted_size(reference_signal_df,lower_revision,higher_revision,size_multiple,\
                                                      revision_row,revision_row_reference,gross,True)
            if risk_parity is True:
                size_row=signal_df.apply(lambda x: min(base_size[x.name]/(vol_row[x.name]/vol_reference),\
                                                       fundamental_df.loc[x.name[0],x.name[1]]["Volume"].iloc[0]*\
                                                       liquidity/capital),axis=0)
            else:
                size_row=signal_df.apply(lambda x: min(base_size[x.name], fundamental_df.loc[x.name[0],x.name[1]]["Volume"].iloc[0]\
                                                       *liquidity/capital),axis=0)            

        elif long is False and revision_adjust[2] is not None:
            lower_revision=revision_adjust[2][0]
            higher_revision=revision_adjust[2][1]
            size_multiple=revision_adjust[3]
        
            base_size,low_size=revision_adjusted_size(reference_signal_df,lower_revision,higher_revision,size_multiple,revision_row,\
                                             revision_row_reference,gross,False) 

            if risk_parity is True:
                size_row=signal_df.apply(lambda x: min(base_size[x.name]/(vol_row[x.name]/vol_reference),\
                                                       fundamental_df.loc[x.name[0],x.name[1]]["Volume"].iloc[0]*\
                                                       liquidity/capital),axis=0)
            else:
                size_row=signal_df.apply(lambda x: min(base_size[x.name], fundamental_df.loc[x.name[0],x.name[1]]["Volume"].iloc[0]\
                                                       *liquidity/capital),axis=0)

        else:
            size_row=None
            low_size=None

    elif revision_adjust[0]=='constant':
        if risk_parity is True:
            size_row=signal_df.apply(lambda x: min(revision_adjust[1]/(vol_row[x.name]/vol_reference),\
                                                   fundamental_df.loc[x.name[0],x.name[1]]["Volume"].iloc[0]*\
                                                   liquidity/capital),axis=0)
        else:
            size_row=signal_df.apply(lambda x: min(revision_adjust[1], fundamental_df.loc[x.name[0],x.name[1]]["Volume"].iloc[0]\
                                                   *liquidity/capital),axis=0)
        low_size=None

    else:
        number=reference_signal_df.count(axis=1).mean()
        avg_size=gross/100/number

        if risk_parity is True:
            size_row=signal_df.apply(lambda x: min(avg_size/(vol_row[x.name]/vol_reference),\
                                                   fundamental_df.loc[x.name[0],x.name[1]]["Volume"].iloc[0]*\
                                                   liquidity/capital),axis=0)
        else:
            size_row=signal_df.apply(lambda x: min(avg_size, fundamental_df.loc[x.name[0],x.name[1]]["Volume"].iloc[0]\
                                                   *liquidity/capital),axis=0)
        low_size=None
    return size_row,low_size

In [None]:
def trading_analytics_date(portfolio_cache):
    '''
    Key portfolio metrics from portfolio cache
    Feed into plot function
    '''
    
    ind_return=portfolio_cache[3]
    signal_count=len(ind_return)
    account_curve=portfolio_cache[1]
    
    if signal_count==0:
        return None,None,None,None,None,None,None
    else:
        mean_return=ind_return.mean()
        hit_rate=len(ind_return[ind_return>0])/len(ind_return)*1.0
        payoff_ratio=ind_return[ind_return>0].mean()/ind_return[ind_return<0].mean()*-1.0
        
        account_price=account_curve+1
        ann_vol=np.std(account_price.diff()/account_price.shift(1))*(260**0.5)
        ann_ret=(account_price.iloc[-1]**(1/len(account_price)))**260-1
        ann_sharpe=ann_ret/ann_vol
        
        max_dd=-((1+account_curve)-(1+account_curve).cummax(axis=0)).expanding().min().min()
        
        #low_date=(np.maximum.accumulate(account_curve)-account_curve).idxmax()
        #high_date=account_curve[:low_date].idxmax()
        #max_dd=1-(1+account_curve[low_date])/(1+account_curve[high_date])
        
        return signal_count,hit_rate,payoff_ratio,ann_ret,ann_vol,ann_sharpe,max_dd

In [None]:
def trading_analytics_simp(account_curve):
    '''
    Key portfolio metrics from portfolio account curve
    Only sharpe and drawdown
    '''


    account_price=account_curve+1
    ann_vol=np.std(account_price.diff()/account_price.shift(1))*(260**0.5)
    ann_ret=(account_price.iloc[-1]**(1/len(account_price)))**260-1
    ann_sharpe=ann_ret/ann_vol

    max_dd=-((1+account_curve)-(1+account_curve).cummax(axis=0)).expanding().min().min()

    #low_date=(np.maximum.accumulate(account_curve)-account_curve).idxmax()
    #high_date=account_curve[:low_date].idxmax()
    #max_dd=1-(1+account_curve[low_date])/(1+account_curve[high_date])

    return ann_sharpe,max_dd

In [None]:
def plot_signal(title,figsize,portfolio_cache):

    account_curve=portfolio_cache[1]
    avg_size=np.abs(portfolio_cache[2]).mean(axis=0).mean()
    ind_return=portfolio_cache[3]
    gross=portfolio_cache[4]
    turnover=portfolio_cache[5]

    fig=plt.figure(figsize=figsize)
    ax1=fig.add_subplot(1,1,1)
    ln1=ax1.plot(account_curve,label='signal',color='b')

    val1=ax1.get_yticks()
    start=val1[0]
    end=val1[-1]
    ax1.set_yticks(np.arange(start,end,0.1))  
    adj_val1=ax1.get_yticks()
    ax1.set_yticklabels(["{:.1%}".format(x) for x in adj_val1])

    ax2=ax1.twinx()
    ln2=ax2.plot(gross,label='gross',color='silver')

    val2=ax2.get_yticks()
    start=val2[0]
    end=val2[-1]
    ax2.set_yticks(np.arange(start,end,0.3))  
    adj_val2=ax2.get_yticks()
    ax2.set_yticklabels(["{:.0%}".format(x) for x in adj_val2])

    count,hit,payoff,ret,vol,sharpe,max_dd=trading_analytics_date(portfolio_cache)

    plt.title("\n".join(textwrap.wrap('count='+str(count)+
                             ',avg_size='+str("{:.1%}".format(avg_size))+
                             ',hit_rate='+str("{:.0%}".format(hit))+
                             ',payoff='+str(round(payoff,1))+
                             ',return='+str("{:.1%}".format(ret))+
                             ',vol='+str("{:.1%}".format(vol))+
                             ',sharpe='+str(round(sharpe,1))+
                             ',turnover='+str(round(turnover,1))+'x'+                             
                             ',max_drawdown='+str("{:.1%}".format(max_dd)))),fontsize=10)

    ax1.set_xlabel('Year')
    ax1.set_ylabel('Return')
    ax2.set_ylabel('Exposure')
    plt.suptitle(title,y=1.05,fontsize=16)
    plt.grid(linestyle='dashed')
    plt.legend(ln1+ln2,[l.get_label() for l in ln1+ln2],loc=2)
    ax1.axhline(y=0,color='k')

    plt.show()        