In [None]:
from data_tools.api import *
from utilscht.Data import *
import pymysql
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from joblib import Parallel,delayed
import tushare as ts
import datetime
import math
%config InlineBackend.figure_format ='retina'
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

START_YEAR=2014

DB_INFO = dict(host='192.168.1.234',
               user='winduser',
               password='1qaz@WSX',
               db='wind')

conn = pymysql.connect(**DB_INFO, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)

In [None]:
def apply_parallel(df_grouped, func, n_jobs=16, backend='loky', as_index=True, **kwargs):
    """
    This is for parallel between grouped generated by pd.DataFrame.groupby
    :param df_grouped:
    :param func:
    :param n_jobs:
    :param backend:
    :param kwargs:
    :return:
    """

    names = []
    groups = []
    for name, group in df_grouped:
        names.append(name)
        groups.append(group)

    results = Parallel(n_jobs=n_jobs, verbose=5, backend=backend, batch_size='auto') \
        (delayed(func)(group, **kwargs) for group in groups)

    return pd.concat(results, keys=names if as_index else None)

In [None]:
pd.__version__

* 下面三个单元格分别是取全部股票的数据、对全部数据进行处理以获得分析师在每年的主动偏差、分股票储存数据，运行需要的时间较长，故已经预先运行好，将结果保存在/share/intern_share/analyst_est_data/. 下面

## 获取计算所需数据

In [None]:

#获取行业和市值
indus_size_df=query_table("DailyBar",start_date="{}0101".format(START_YEAR),end_date="20201231",fields=["L1_INDUSTRY","mktcap"])
indus_size_df["DataDate"]=indus_size_df["DataDate"].apply(lambda x:str(x))

#获取研报预期数据
sql = """SELECT S_INFO_WINDCODE, EST_DT, REPORTING_PERIOD,RESEARCH_INST_NAME, ANALYST_NAME,EST_NET_PROFIT 
from ASHAREEARNINGEST where EST_DT between '{}0101' and '20201231'""".format(START_YEAR)   
df_est = pd.read_sql_query(sql, conn)
df_est.rename(columns={"S_INFO_WINDCODE":"sid","EST_DT":"DataDate"},inplace=True)
df_est=df_est.sort_values(["sid","DataDate","REPORTING_PERIOD"])
df_est=pd.merge(df_est,indus_size_df,on=["sid","DataDate"],how="left")

#获取发布的年报数据
sql = """SELECT S_INFO_WINDCODE, ANN_DT, REPORT_PERIOD,S_FA_DEDUCTEDPROFIT,S_FA_EXTRAORDINARY
from ASHAREFINANCIALINDICATOR where ANN_DT between '{}0101' and '20201231'""".format(START_YEAR)
df_FA_EPS = pd.read_sql_query(sql, conn)
df_FA_EPS.rename(columns={"S_INFO_WINDCODE":"sid","REPORT_PERIOD":"REPORTING_PERIOD"},inplace=True)
df_est=pd.merge(df_est,df_FA_EPS,on=["sid","REPORTING_PERIOD"],how="left")

#获取总股本信息
sql="select S_INFO_WINDCODE,TOT_SHR,CHANGE_DT from AShareCapitalization".upper()
df_tot_share=pd.read_sql_query(sql,conn)
df_tot_share.rename(columns={"S_INFO_WINDCODE":"sid"},inplace=True)
df_tot_share=df_tot_share.sort_values(["sid","CHANGE_DT"])
df_now_tot_share=df_tot_share.groupby('sid',as_index=False).apply(lambda x:x.iloc[-1])

#计算前复权每股预期收益
df_est=pd.merge(df_est,df_now_tot_share,on='sid',how='left')
df_est['EST_EPS_DILUTED']=df_est['EST_NET_PROFIT']/df_est['TOT_SHR']
df_est['S_FA_EPS_DILUTED']=(df_est['S_FA_DEDUCTEDPROFIT']+df_est['S_FA_EXTRAORDINARY'])/df_est['TOT_SHR']/10000

## 计算分析师的主观偏差

In [None]:
#计算距离年末的月数（第二年初的预测月数为负值）
df_1y_est=df_est.groupby(["sid","DataDate","ANALYST_NAME"],as_index=False).apply(lambda x:x.iloc[0]).reset_index(drop=True)
df_1y_est["Date_to_Reporting"]=df_1y_est[["DataDate","REPORTING_PERIOD"]].apply(lambda x:int(x[1][4:6])-int(x[0][4:6]) if x[0][0:4]==x[1][0:4] else int(x[1][4:6])-int(x[0][4:6])-12,axis=1)

#计算偏差比例
df_1y_est["Analyst_Bias"]=np.abs(df_1y_est["S_FA_EPS_DILUTED"]-df_1y_est["EST_EPS_DILUTED"])/df_1y_est["S_FA_EPS_DILUTED"]

#按照年度将偏差对行业、市值、距离年末月数进行回归取残差
def Regress_by_Year(df):
    df=df[pd.notnull(df["Analyst_Bias"])&pd.notnull(df["L1_INDUSTRY"])]
    if len(df)==0:
        return df
    bias=np.array(df["Analyst_Bias"])
    
    indus_dummy=np.array(pd.get_dummies(df["L1_INDUSTRY"]))
    mktv=np.array(df["mktcap"])
    date_to_reporting=np.array(df["Date_to_Reporting"])
    
    model=sm.OLS(bias,np.column_stack([indus_dummy,mktv,date_to_reporting]))
    result=model.fit()
    df["adj_analyst_bias"]=result.resid
    
    return df

df_1y_est["adj_analyst_bias"]=np.nan
df_1y_est=df_1y_est.groupby("REPORTING_PERIOD",as_index=False).apply(lambda x:Regress_by_Year(x)).reset_index(drop=True)

#按照预测年度、分析师来计算偏差平均值作为下一年度该分析师的可信水平
analyst_avg_bias=df_1y_est.groupby(["REPORTING_PERIOD","ANALYST_NAME"]).apply(lambda x:np.nan if len(x["adj_analyst_bias"])<5 else np.mean(x["adj_analyst_bias"]))
analyst_avg_bias=analyst_avg_bias.to_frame(name="analyst_avg_bias").reset_index()
analyst_avg_bias["next_reporting_period"]=analyst_avg_bias["REPORTING_PERIOD"].apply(lambda x:str(int(x)+10000))
del analyst_avg_bias["REPORTING_PERIOD"]

df_est["ANN_PERIOD"]=df_est["DataDate"].apply(lambda x:x[0:4]+"1231" if x[4:6]>'04' else str(int(x[0:4])-1)+'1231')
df_est=pd.merge(df_est,analyst_avg_bias,left_on=["ANALYST_NAME","ANN_PERIOD"],right_on=["ANALYST_NAME","next_reporting_period"],how="left")

## 将数据按股票代码存到文件夹下面

In [None]:
df_est.groupby("sid").apply(lambda x:x.to_csv(r"/share/intern_share/analyst_est_data/analyst_est_data_{}.csv"\
                           .format(x["sid"].iloc[0]),index=False))

* 下面是针对每一只股票进行的计算

## 计算个股一致预期盈利数据

In [None]:
#对过去3个月（过去三个月研报数小于5的个股使用过去6月的数据，对于过去六个月没有研报的个股填充nan值）的数据进行过滤，同一分析师对同一个股的多篇研报只取最后一篇
#并对过去三个月的一致预期数据按照时间和分析师偏差进行双重加权（时间越近权重越大，分析师偏差越小权重越大）

date_calendar=pd.date_range("{}-01-01".format(START_YEAR-1),"2020-03-10")
date_calendar=[str(i)[0:4]+str(i)[5:7]+str(i)[8:10] for i in date_calendar]

#按照分析师上一年预测偏差以及研报发布时间进行双重加权
def con_est_calcu(est_data,date,date_calendar):
    if len(est_data) ==0:
        return np.nan
    analyst_bias=est_data["analyst_avg_bias"]
    max_bias=np.nanmax(analyst_bias)
    min_bias=np.nanmin(analyst_bias)
    #通过上一年度分析师偏差计算分析师维度上的权重，参数可进行调整
    if max_bias==min_bias or (math.isnan(min_bias) or math.isnan(max_bias)):
        num = len(analyst_bias)
        weight_analyst=np.array([1/num]*num)
    else:    
        weight_analyst=analyst_bias.apply(lambda x:1 if math.isnan(x) else 3-2*(x-min_bias)/(max_bias-min_bias))
    weight_analyst=weight_analyst/np.sum(weight_analyst)

    #确定时间维度上的权重，45天为半衰期，参数可进行调整
    est_data["date_to_now"]=est_data["DataDate"].apply(lambda x:date_calendar.index(date)-date_calendar.index(x))
    weight_date=np.power(0.5**(1/45),np.array(est_data["date_to_now"]))
    weight_date=weight_date/np.sum(weight_date)

    total_weight=weight_analyst*weight_date
    total_weight=np.array(total_weight/np.sum(total_weight))

    #median_mad法去除极端值
    est_eps=est_data["EST_EPS_DILUTED"].values
    median = np.median(est_eps)
    MAD =  1.483 * np.median(np.abs(est_eps - median))
    est_eps = np.clip(est_eps,median-3*MAD,median+3*MAD)

    #若剩余研报数大于等于3，则去掉一个最高值和最低值
    if len(est_eps)>=3:
        max_index = np.argmax(est_eps)
        min_index = np.argmin(est_eps)
        est_eps = np.delete(est_eps,[max_index,min_index])
        total_weight = np.delete(total_weight,[max_index,min_index])
        total_weight = total_weight/np.sum(total_weight)

    con_est_eps=np.sum(np.array(est_eps)*total_weight)

    return con_est_eps

def Process_Single_StkPeriod(df):
    date_ls=list(set(df.index))
    date_ls.sort()
    
    single_stkperiod_result=pd.DataFrame(index=date_ls,columns=["con_est_eps"])
    for date in date_ls:
        date_range=date_calendar[date_calendar.index(date)-89:date_calendar.index(date)+1]#取过去90天数据（包含今天）
        est_data=df.loc[date_range]
        est_data=est_data[pd.notnull(est_data["EST_EPS_DILUTED"])]
        est_data=est_data.groupby("ANALYST_NAME").apply(lambda x:x.iloc[-1])#同一分析师多篇研报只取最后一篇
        
        #如果过去90天研报数小于10，则取过去180天研报，参数可进行调整
        if len(est_data)<10:
            date_range=date_calendar[date_calendar.index(date)-179:date_calendar.index(date)+1]
            est_data=df.loc[date_range]
            est_data=est_data[pd.notnull(est_data["EST_EPS_DILUTED"])]
            
        con_est_data=con_est_calcu(est_data,date,date_calendar)
        single_stkperiod_result.loc[date,"con_est_eps"]=con_est_data

    return single_stkperiod_result

In [None]:
def get_con_est_eps(sid):
    df_est = pd.read_csv(r"/share/intern_share/analyst_est_data/analyst_est_data_{}.csv".format(sid),
                        dtype = {"DataDate":str,"REPORTING_PERIOD":str})
    df_est = df_est.set_index("DataDate")
    df_est["DataDate"] = df_est.index
    con_est_eps_df =df_est.groupby(["sid","REPORTING_PERIOD"]).apply(Process_Single_StkPeriod)
    con_est_eps_df=con_est_eps_df.reset_index().rename(columns={"level_0":"sid","level_1":"REPORTING_PERIOD","level_2":"DataDate"})
    #按照交易日来reindex
    trade_calendar=get_trade_dates("{}0101".format(START_YEAR),"20200310")
    trade_calendar=[str(i) for i in trade_calendar]

    con_est_eps_df=con_est_eps_df.groupby(["sid","REPORTING_PERIOD"],as_index=False)\
                        .apply(lambda x:x.replace(0,np.nan).fillna(method="ffill"))
    con_est_eps_df=con_est_eps_df.groupby(["sid","REPORTING_PERIOD"],as_index=False)\
                        .apply(lambda x:x.drop_duplicates("DataDate").set_index("DataDate").reindex(trade_calendar,method="ffill").dropna().reset_index())
    con_est_eps_df = con_est_eps_df.reset_index(drop=True)
    return con_est_eps_df

## 对一致预期值进行调整

In [None]:
def get_adjust_data(sid,conn):
    #取企业快报数据
    sql_1 = """SELECT S_INFO_WINDCODE, ANN_DT, REPORT_PERIOD,NET_PROFIT_EXCL_MIN_INT_INC
    from ASHAREPROFITEXPRESS where S_INFO_WINDCODE = '{}' and ANN_DT between '{}0101' and '20201231'""".format(sid,START_YEAR)

    df_ProfitExpress = pd.read_sql_query(sql_1, conn)
    df_ProfitExpress.rename(columns={"S_INFO_WINDCODE":"sid","REPORT_PERIOD":"REPORTING_PERIOD"},inplace=True)

    #取企业预报数据
    sql_2 = """SELECT S_INFO_WINDCODE, S_PROFITNOTICE_DATE, S_PROFITNOTICE_PERIOD,S_PROFITNOTICE_NETPROFITMIN,S_PROFITNOTICE_NETPROFITMAX
    from ASHAREPROFITNOTICE where S_INFO_WINDCODE = '{}' and S_PROFITNOTICE_DATE between '{}0101' and '20201231'""".format(sid,START_YEAR)

    df_ProfitNotice = pd.read_sql_query(sql_2, conn)
    df_ProfitNotice.rename(columns={"S_INFO_WINDCODE":"sid","S_PROFITNOTICE_PERIOD":"REPORTING_PERIOD"},inplace=True)

    #获取发布的年报数据
    sql = """SELECT S_INFO_WINDCODE, ANN_DT, REPORT_PERIOD,S_FA_DEDUCTEDPROFIT,S_FA_EXTRAORDINARY
    from ASHAREFINANCIALINDICATOR where S_INFO_WINDCODE = '{}' and ANN_DT between '{}0101' and '20201231'""".format(sid,START_YEAR)
    df_FA_EPS = pd.read_sql_query(sql, conn)
    df_FA_EPS.rename(columns={"S_INFO_WINDCODE":"sid","REPORT_PERIOD":"REPORTING_PERIOD"},inplace=True)

    #获取总股本信息
    sql="select S_INFO_WINDCODE,TOT_SHR,CHANGE_DT from AShareCapitalization where S_INFO_WINDCODE = '{}' ".format(sid).upper()
    df_tot_share=pd.read_sql_query(sql,conn)
    df_tot_share.rename(columns={"S_INFO_WINDCODE":"sid"},inplace=True)
    df_tot_share=df_tot_share.sort_values(["sid","CHANGE_DT"])
    df_now_tot_share=df_tot_share.groupby('sid',as_index=False).apply(lambda x:x.iloc[-1])
    
    return df_ProfitExpress,df_ProfitNotice,df_FA_EPS,df_now_tot_share

In [None]:
def get_adjusted_con_est_eps(sid,conn,file_path="result//"): 
    """
    params:
    sid: the stock code, 000001.SZ for example
    conn: the sql connection
    file_path: the path to store the result

    function: 
    generate and store the adjusted consensus estimate eps file
    """
    
    ##如果企业发布了业绩预报或者快报，则直接采取业绩预报或者快报的数据
    con_est_eps_df = get_con_est_eps(sid)
    df_ProfitExpress,df_ProfitNotice,df_FA_EPS,df_now_tot_share =get_adjust_data(sid,conn)
    df_FA_EPS.set_index(["sid","REPORTING_PERIOD"],inplace=True)

    def Fill_with_Notice(df):
        ann_date=df["S_PROFITNOTICE_DATE"][df.index[0]]
        sid=df["sid"][df.index[0]]
        totshare = df_now_tot_share.set_index('sid').loc[sid,'TOT_SHR']
        min_profit=df["S_PROFITNOTICE_NETPROFITMIN"][df.index[0]]
        max_profit=df["S_PROFITNOTICE_NETPROFITMAX"][df.index[0]]
        try:
            notice_eps=(min_profit+max_profit)/2/totshare
            df["con_est_eps"][df["DataDate"]>=date_calendar[date_calendar.index(ann_date)-1]]=notice_eps
        except ValueError as e:
            pass
        except TypeError as e:
            pass
        return df

    con_est_eps_df_merged=pd.merge(con_est_eps_df,df_ProfitNotice,on=["sid","REPORTING_PERIOD"],how="left")
    con_est_eps_df_merged=con_est_eps_df_merged.groupby(["sid","REPORTING_PERIOD"]).apply(Fill_with_Notice)


    def Fill_with_Express(df):
        sid=df["sid"][df.index[0]]
        ann_date=df["ANN_DT"][df.index[0]]
        totshare = df_now_tot_share.set_index('sid').loc[sid,'TOT_SHR']
        express_eps=df["NET_PROFIT_EXCL_MIN_INT_INC"][df.index[0]]/totshare/10000
        try:
            df["con_est_eps"][df["DataDate"]>=date_calendar[date_calendar.index(ann_date)-1]]=express_eps
        except ValueError as e:
            pass
        return df

    con_est_eps_df_merged=pd.merge(con_est_eps_df_merged,df_ProfitExpress,on=["sid","REPORTING_PERIOD"],how="left")
    con_est_eps_df_merged=con_est_eps_df_merged.groupby(["sid","REPORTING_PERIOD"]).apply(Fill_with_Express)
    con_est_eps_df_merged=con_est_eps_df_merged[["sid","REPORTING_PERIOD","DataDate","con_est_eps"]]

    #在企业发布正式年报之后，使用正式年报数据代替一致预期值
    def Fill_with_FA(df):
        sid=df["sid"][df.index[0]]
        ann_date=df["ANN_DT"][df.index[0]]
        totshare = df_now_tot_share.set_index('sid').loc[sid,'TOT_SHR']
        FA_eps=(df['S_FA_DEDUCTEDPROFIT']+df['S_FA_EXTRAORDINARY'])[df.index[0]]/totshare/10000
        try:
            df["con_est_eps"][df["DataDate"]>=date_calendar[date_calendar.index(ann_date)-1]]=FA_eps
        except ValueError as e:
            pass
        return df

    df_FA_EPS.reset_index(inplace=True)
    con_est_eps_df_merged=pd.merge(con_est_eps_df_merged,df_FA_EPS,on=["sid","REPORTING_PERIOD"],how="left")
    con_est_eps_df_merged=con_est_eps_df_merged.groupby(["sid","REPORTING_PERIOD"]).apply(Fill_with_FA)

    #储存数据
    con_est_eps_df_merged=con_est_eps_df_merged.groupby(["sid","REPORTING_PERIOD"],as_index=False)\
                        .apply(lambda x:x.replace(0,np.nan).fillna(method="ffill"))
    con_est_eps_df_merged=con_est_eps_df_merged[["sid","REPORTING_PERIOD","DataDate","con_est_eps"]]
    con_est_eps_df_merged.to_csv(file_path+"con_eps_my_{}.csv".format(sid),index=False)

## 获取一致预期值、股票收盘价、PE值并merge到一个dataframe中

In [None]:
def get_merged_eps_pe_price(sid,conn):
    #获取一致预期数据
    con_expect_data=pd.read_csv(r"result/con_eps_my_{}.csv".format(sid),dtype={"DataDate":str,"REPORTING_PERIOD":str})
    con_expect_data=con_expect_data.groupby(["sid","REPORTING_PERIOD"],as_index=False)\
                        .apply(lambda x:x.replace(0,np.nan).fillna(method="ffill"))
    open_price_adj=pd.read_csv(r"data/open_price.csv",dtype={"DataDate":str})
    con_expect_data=con_expect_data[["sid","REPORTING_PERIOD","DataDate","con_est_eps"]]

    stock_pool_est_eps=con_expect_data.set_index("sid")
    stock_pool_est_eps=stock_pool_est_eps[stock_pool_est_eps["DataDate"]>"{}1101".format(START_YEAR)].reset_index()

    #加入收盘价序列
    sql = """SELECT S_INFO_WINDCODE, TRADE_DT, S_DQ_ADJCLOSE,S_DQ_ADJFACTOR  from ASHAREEODPRICES 
    where S_INFO_WINDCODE = '{}' and  TRADE_DT between '{}0101' and '20201231'""".format(sid,START_YEAR+1)    
    df_close_hfq = pd.read_sql_query(sql, conn)
    df_close_hfq=df_close_hfq.rename(columns={"S_INFO_WINDCODE":"sid","TRADE_DT":"DataDate"})
    df_close_hfq=df_close_hfq.sort_values(["sid","DataDate"])
    df_close_hfq["S_DQ_ADJCLOSE"]=df_close_hfq["S_DQ_ADJCLOSE"]/df_close_hfq["S_DQ_ADJFACTOR"].iloc[-1]

    stock_pool_est_eps=pd.merge(stock_pool_est_eps,df_close_hfq,on=["sid","DataDate"],how="left")

    #利用过去两年 PE中位数作为当前合理的 PE
    sql = """SELECT S_INFO_WINDCODE, TRADE_DT, S_VAL_PE_TTM,S_DQ_MV,S_VAL_PB_NEW from ASHAREEODDERIVATIVEINDICATOR
    where S_INFO_WINDCODE = '{}' and TRADE_DT between '{}0101' and '20201231'""".format(sid,START_YEAR-1)  
    df_PETTM = pd.read_sql_query(sql, conn)
    df_PETTM=df_PETTM.rename(columns={"S_INFO_WINDCODE":"sid","TRADE_DT":"DataDate"})
    df_PETTM=df_PETTM.sort_values(["sid","DataDate"])
    
    #参数可进行调整
    def Rolling_Median(df):
        df["S_VAL_PE_TTM"]=df["S_VAL_PE_TTM"].fillna(method='bfill')
        df["pe_2y_rollingmedian"]=df["S_VAL_PE_TTM"].rolling(252*3).apply(lambda x:np.percentile(x,40)) #取滚动40%分位数
        return df

    df_PETTM_2y_rollingmedian=Rolling_Median(df_PETTM)
    stock_pool_est_eps=pd.merge(stock_pool_est_eps,df_PETTM_2y_rollingmedian,on=["sid","DataDate"],how="left")
    return stock_pool_est_eps

# 进行画图

### 新版本

In [None]:
#analyse the perfromance of the strategy on single stock
def get_prices_v1(daily_df):
    close=daily_df["S_DQ_ADJCLOSE"].values[0]
    date=daily_df["DataDate"].values[0]

    prices=np.array([0,0,0,0],dtype=float)
    for i in range(4):
        try:
            reporting_period=str(int(date[0:4])+i-1)+"1231"
            con_est_eps=daily_df[daily_df["REPORTING_PERIOD"]==reporting_period]["con_est_eps"].values[0]
            pe=daily_df[daily_df["REPORTING_PERIOD"]==reporting_period]["pe_2y_rollingmedian"].values[0]
            prices[i]=con_est_eps*pe
        except IndexError as e:
            pe = np.nan
            pass
        
    #若第三年con_eps 为空，则线性填充第三年数据
    if prices[2]==0:
        prices[2]= prices[1]+(prices[1]-prices[0])
    
    #处理三年con_eps 不随时间递增的情况 (p0, p1 可能因为一大笔非常规性收益而异常过大)
    if prices[0]>prices[1] and prices[1]<prices[2]:
        prices[0] = prices[1]-(prices[2]-prices[1]) 
    elif prices[1]>prices[2] and prices[0]<prices[2]:
        prices[1] = (prices[0]+prices[2])/2
    
    #若第四年con_eps 为空，则线性填充第四年数据
    if prices[3] ==0:
        prices[3]=prices[2]+(prices[2]-prices[1])
    
        
    #进行平滑（向未来一年进行滚动）
    month , day =( int(date[4:6]), int(date[6:8]) )
    day_num = (month-1)*30 +day
    prices_smooth = np.array([0,0,0],dtype=float)
    for i in range(3):
        prices_smooth[i] = (prices[i]*(360-day_num)+prices[i+1]*day_num)/360
    
    return pd.DataFrame(data=[[prices_smooth[0],prices_smooth[1],prices_smooth[2],close,pe]],columns=["p0","p1","p2","close","pe_rollingmedian"])


def single_stock_plotting_v1(sid,conn):
    """
    params:
    sid: the stock code, 000001.SZ for example
    conn: the sql connection

    function: 
    plot the close_price and the con_est_prices(p0,p1,p2)
    """
    df = get_merged_eps_pe_price(sid,conn)
    sid=df["sid"][df.index[0]]
    
    df=df.groupby("DataDate").apply(get_prices_v1)
    df=df.reset_index().set_index("DataDate")
    del df["level_1"]
    df.index=pd.to_datetime(df.index)
    
    plt.xticks(rotation=90)
    plt.plot(df[["p0","p1","p2","close"]])
    plt.legend()
    plt.grid()
    plt.title(sid+" close_price and the con_est_prices")
    plt.show()



### 原版本

In [None]:
#analyse the perfromance of the strategy on single stock
def get_prices_v2(daily_df,column = "con_est_eps"):
    close=daily_df["S_DQ_ADJCLOSE"].values[0]
    date=daily_df["DataDate"].values[0]

    est_eps_arr=np.array([0,0,0,0],dtype=float)
    for i in range(4):
        try:
            reporting_period=str(int(date[0:4])+i-1)+"1231"
            con_est_eps=daily_df[daily_df["REPORTING_PERIOD"]==reporting_period]\
                        [column].values[0]
            est_eps_arr[i]=con_est_eps
            if est_eps_arr[i] < 0:
                est_eps_arr[i] = np.nan
        except IndexError as e:
            pass
    
    if column == "con_est_eps":
        #若第三年con_eps 为空，则线性填充第三年数据
        if est_eps_arr[2]==0:
            est_eps_arr[2]= est_eps_arr[1]+(est_eps_arr[1]-est_eps_arr[0])

        #处理三年con_eps 不随时间递增的情况 (p0, p1 可能因为一大笔非常规性收益而异常过大)
        if est_eps_arr[0]>est_eps_arr[1] and est_eps_arr[1]<est_eps_arr[2]:
            est_eps_arr[0] = est_eps_arr[1]-(est_eps_arr[2]-est_eps_arr[1])
        elif est_eps_arr[1]>est_eps_arr[2] and est_eps_arr[0]<est_eps_arr[2]:
            est_eps_arr[1] = (est_eps_arr[0]+est_eps_arr[2])/2

        #若第四年con_eps 为空，则线性填充第四年数据
        if est_eps_arr[1]>= est_eps_arr[2]*0.5:
            est_eps_arr[3]=est_eps_arr[2]+(est_eps_arr[2]-est_eps_arr[1])
    
    est_eps_t = est_eps_arr[1]
    pb = daily_df["S_VAL_PB_NEW"].iloc[0]
    
    return pd.DataFrame(data=[[est_eps_arr[0],est_eps_arr[1],est_eps_arr[2],est_eps_arr[3],close,close/est_eps_t,pb]],\
                        columns=["est_eps0","est_eps1","est_eps2","est_eps3","close","price/est1","pb"])
    
    

def single_stock_plotting_v2(sid,conn,consecutive_rolling,use_pe = 0,start_date ="20160101",end_date = "20200310"):
    """
    params:
    sid: the stock code, 000001.SZ for example
    conn: the sql connection

    function: 
    plot the close_price and the con_est_prices(p0,p1,p2)
    """
    df = get_merged_eps_pe_price(sid,conn)
    sid=df["sid"].iloc[0]
    
    df = df[df["DataDate"] >= str(int(start_date[0:4])-1)+"0101"]
    df = df[df["DataDate"] <= end_date]
    
    df=df.groupby("DataDate").apply(lambda x:get_prices_v2(x))
    df=df.reset_index().drop(["level_1"],axis = 1)
    
    #连续 rolling_percentile
    if consecutive_rolling == True:
        df["price/est_bottom"]  = df["price/est1"].rolling(252).apply(lambda x:np.percentile(x,40))
    
    #离散 rolling_percentile
    else:
        df["year"] = df["DataDate"].apply(lambda x:int(x[0:4]))
        get_bottom = lambda x:(np.percentile(x.iloc[80:],10) if  len(x)>80 \
                               else np.percentile(x,10) )

        get_middle = lambda x: np.percentile(x,40)
        df_pe_est_bottom =df["price/est1"].groupby(df["year"]).apply(get_bottom).to_frame("price/est_bottom")
        df_pe_est_bottom = df_pe_est_bottom.reset_index()
        df_pe_est_bottom["year"] = df_pe_est_bottom["year"].apply(lambda x:x)
        df = pd.merge(df,df_pe_est_bottom,on = "year",how = "left")
        
        
    df = df[df["DataDate"]>=start_date]
    
    df = df.set_index("DataDate")
    df.index=pd.to_datetime(df.index) 
    
    if use_pe == 0:
        for i in range(4):
            df["p"+str(i)] = df["price/est_bottom"].values*df["est_eps"+str(i)].values
    else:
        for i in range(4):
            df["price/est_bottom"]  =  use_pe
            df["p"+str(i)] = use_pe*df["est_eps"+str(i)].values
    
    
    df["price/est1"] = df["price/est1"].apply(lambda x:min(x,200))
    df[["price/est1","price/est_bottom"]].plot()
    plt.legend()
    plt.grid()
    industry_plotting.savefig()
    plt.show()
    
    df[["p0","p1","p2","p3","close"]].plot()
    plt.legend()
    plt.grid()
    plt.title(sid+" close_price and the con_est_prices")
    industry_plotting.savefig()
    plt.show()


# 银行PB定价

In [None]:
def single_stock_plotting_pb(sid,conn,use_pb,start_date ="20160101",end_date = "20200310"):
    sql = """select S_INFO_WINDCODE,EST_DT,EST_REPORT_DT,S_EST_AVGBPS,S_EST_MEDIANBPS from AShareConsensusData 
    where S_INFO_WINDCODE  = '{}' and  EST_DT between "20100101" and "20200310" """.format(sid).upper()
    
    df = pd.read_sql_query(sql,conn)
    df = df.rename(columns={"S_INFO_WINDCODE":"sid","EST_DT":"DataDate","EST_REPORT_DT":"REPORTING_PERIOD"})
    sid=df["sid"].iloc[0]
    
     #加入收盘价序列
    sql = """SELECT S_INFO_WINDCODE, TRADE_DT, S_DQ_ADJCLOSE,S_DQ_ADJFACTOR  from ASHAREEODPRICES 
    where S_INFO_WINDCODE = '{}' and  TRADE_DT between '{}0101' and '20201231'""".format(sid,2010)    
    df_close_hfq = pd.read_sql_query(sql, conn)
    df_close_hfq=df_close_hfq.rename(columns={"S_INFO_WINDCODE":"sid","TRADE_DT":"DataDate"})
    df_close_hfq=df_close_hfq.sort_values(["sid","DataDate"])
    df_close_hfq["S_DQ_ADJCLOSE"]=df_close_hfq["S_DQ_ADJCLOSE"]/df_close_hfq["S_DQ_ADJFACTOR"].iloc[-1]
    df =pd.merge(df,df_close_hfq,on=["sid","DataDate"],how="outer")
    
    #加入PB序列
    sql = """SELECT S_INFO_WINDCODE, TRADE_DT,S_VAL_PB_NEW from ASHAREEODDERIVATIVEINDICATOR
    where S_INFO_WINDCODE = '{}' and TRADE_DT between '{}0101' and '20201231'""".format(sid,2010)  
    df_PB = pd.read_sql_query(sql, conn)
    df_PB = df_PB.rename(columns={"S_INFO_WINDCODE":"sid","TRADE_DT":"DataDate"})
    df_PB = df_PB.sort_values(["sid","DataDate"])
    df =pd.merge(df,df_PB,on=["sid","DataDate"],how="outer")
    
    df = df[df["DataDate"] >= str(int(start_date[0:4])-1)+"0101"]
    df = df[df["DataDate"] <= end_date]
    
    df=df.groupby("DataDate").apply(lambda x:get_prices_v2(x,column="S_EST_MEDIANBPS" ))
    df=df.reset_index().drop(["level_1"],axis = 1)
    
    df = df.replace(0.0,np.nan)
    df = df.sort_values(["DataDate"]).fillna(method = "ffill")
    
    df["est_eps0"] = df["est_eps1"] - (df["est_eps2"] - df["est_eps1"])
    df["est_eps3"] = df["est_eps2"] + (df["est_eps2"] - df["est_eps1"])
    
    df[["est_eps0","est_eps1","est_eps2","est_eps3"]] = use_pb * df[["est_eps0","est_eps1","est_eps2","est_eps3"]] 
    df[["est_eps0","est_eps1","est_eps2","est_eps3"]] = df[["est_eps0","est_eps1","est_eps2","est_eps3"]].rolling(120,axis = 0).apply(np.nanmean)
    
    df = df.set_index("DataDate")
    df.index=pd.to_datetime(df.index) 
    
    df["pb_bottom"] = use_pb
    df[["pb","pb_bottom"]].plot()
    plt.legend()
    plt.grid()
    plt.show()
    
    df[["est_eps0","est_eps1","est_eps2","est_eps3","close"]].plot()
    plt.legend()
    plt.grid()
    plt.show()
    
    return df

In [None]:
conn = pymysql.connect(**DB_INFO, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)

bank_code_ls = ["601398.SH","601988.SH","601288.SH","601939.SH","601328.SH"\
               ,"601166.SH","600016.SH","601998.SH","600015.SH","601818.SH"]
pb_bottom_ls = [0.8,0.65,0.7,0.8,0.6,0.7,0.6,0.65,0.55,0.7]
for index,sid in enumerate(bank_code_ls):
    print(sid)
    df= single_stock_plotting_pb(sid,conn,start_date="20140501",use_pb=pb_bottom_ls[index],end_date="20200310")

# PE定价

In [None]:
"""get_adjusted_con_est_eps("000001.SZ",conn) #获取一致预期数据文件
single_stock_plotting_v1("000001.SZ",conn)# 绘图，新版本
single_stock_plotting_v2("000001.SZ",conn)# 绘图，旧版本"""

In [None]:
from matplotlib.backends.backend_pdf import PdfPages

In [None]:
for i in range(18,19):
    conn = pymysql.connect(**DB_INFO, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
    df_indus = pd.read_excel(r"/home/ywang/tempory task/result/Top_stocks_each_indus.xlsx",sheet_name=i)
    industry_plotting = PdfPages('result//industry_plotting//industry_{}.pdf'.format(i))
    for sid in df_indus["sid"]:
        if sid == '600968.SH':
            continue
        try:
            get_adjusted_con_est_eps(sid,conn) #获取一致预期数据文件
            single_stock_plotting_v2(sid,conn,consecutive_rolling=False,start_date="20151201",end_date="20200310")
        except ValueError:
            pass
    industry_plotting.close()

In [None]:
conn = pymysql.connect(**DB_INFO, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
get_adjusted_con_est_eps("601398.SH",conn) #获取一致预期数据文件

In [None]:
conn = pymysql.connect(**DB_INFO, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
single_stock_plotting_v2("601398.SH",conn,consecutive_rolling=True,use_pe=6,start_date="20160501",end_date="20200310") 