In [1]:
%load_ext autoreload
%autoreload 2
%config Completer.use_jedi = False 

In [2]:
# Env module 
import sys
sys.path.append('../')


from env import os, glob, pdl, pd, msno, trange, tqdm, sleep, timeit, timedelta, copy
from IPython.display import clear_output

from utils.datetimes import start_date, end_date, yesterday_date, today_date, \
    week_ago_date, month_ago_date, biquater_ago_date, bimonth_ago_date, quater_ago_date, \
    biquater_ago_date, triquater_ago_date, trade_day_util as tdu

from utils.calculators import *
from utils.psql_client import load_table, insert_df, load_stock_prices, get_stock_basic
from utils.stock_utils import *
from utils.datasource import *
from data_center import DataCenter
from utils.stock_filter import StockFilter
print(f'Today is {today_date}, Working from {start_date} to {end_date}')

from utils.strategy import *

from models import *

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
# 列名与数据对其显示 (ak?)
pd.set_option('display.unicode.ambiguous_as_wide', True)
pd.set_option('display.unicode.east_asian_width', True)

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

[2022-04-15T20:24:15.458605+08:00] Enviroment loaded. Working Dir: /Users/tzhu/work/lab/neo_world/notebooks
Today is 2022-04-15, Working from 2021-03-29 to 2022-04-15


In [3]:
# BT module 

import backtrader as bt
import backtrader.indicators as btind
import backtrader.feeds as btfeeds

In [4]:
%matplotlib inline

from utils.plot_plotly import *
from utils.plot_mpl import *
from utils.plot_bokeh import *

In [5]:
ak.stock_hsgt_north_net_flow_in_em(symbol="北上").rename(columns={'date': 'trade_date'}).set_index('trade_date')

Unnamed: 0_level_0,value
trade_date,Unnamed: 1_level_1
2014-11-17,1300000.00
2014-11-18,484500.00
2014-11-19,261200.00
2014-11-20,227600.00
2014-11-21,234100.00
...,...
2022-04-07,235236.55
2022-04-08,302599.59
2022-04-11,-287765.63
2022-04-12,1234439.13


In [21]:
def init_data(start_date, end_date, expire_days=30):
    print(f'Initializing data from {start_date} to {end_date}...')
    dc = DataCenter(start_date, end_date)
    
    search_pattern = glob.glob(f'../tmp/price_{start_date}_{end_date}_*.feather')
    for f in search_pattern:
        # read cache
        print(f'Found cache file: {f}, loading...')
        df_init = pd.read_feather(f).set_index(['ts_code', 'trade_date'])
        break
    else:
        df_init = dc.merge_all()
        # cache it
        expire_date = pdl.today().add(days=expire_days).to_date_string()
        df_file_path = f'../tmp/price_{start_date}_{end_date}_{expire_date}.feather'
        df_init.reset_index().to_feather(df_file_path)      
    return dc, df_init    

In [22]:
def clean_cache_files():
    files = glob.glob('../tmp/price_*')
    for f in files:
        file_name = f.split('/')[-1]
        file_params = file_name.split('.')[0].split('_')
        if len(file_params) == 4 and pdl.parse(file_params[3]).set(tz='Asia/Shanghai') >= pdl.today():
            print(f'Preserving recent file {file_name}')
        else:
            print(f'Deleting old file {file_name}')
            try:       
                os.remove(f)
            except OSError as e:
                print("Error: %s : %s" % (f, e.strerror))

In [23]:
def cache_data(df, name='custom', expire_days=30):
    expire_date = pdl.today().add(days=expire_days).to_date_string()
    start_date = df.index[0][1].strftime('%Y-%m-%d')
    end_date = df.index[-1][1].strftime('%Y-%m-%d')
    df_file_path = f'../tmp/{name}_{start_date}_{end_date}_{expire_date}.feather'
    df.reset_index().to_feather(df_file_path)
    return df_file_path

    
def read_cache(name='custom', start_date='*', end_date='*'):
    search_pattern = glob.glob(f'../tmp/{name}_{start_date}_{end_date}_*.feather')
    for f in search_pattern:
        print(f'Found cache file: {f}, loading...')
        df = pd.read_feather(f).set_index(['ts_code', 'trade_date'])
        break
    return df

In [24]:
def stock_summary(df, ts_code, end_date):
    if len(ts_code) != 6 and len(ts_code) != 9:
        ts_code = get_ts_code_from_name(ts_code)
    row = df.loc[ts_code, end_date]
    ak_code = add_postfix(type='ak', ts_code=ts_code)
    stock_zh_a_tick_tx_js_df = ak.stock_zh_a_tick_tx_js(code=ak_code)
    auc_amount = stock_zh_a_tick_tx_js_df.iloc[0]['成交金额']
    print(f'[{row["name"]}] 竞价成交{round(auc_amount/10000, 2)}万，开盘{round(row.open_pct, 2)}%，收盘{round(row.pct_chg,2)}%，量比{round(row.vol_ratio,2)}，成交额：{round(row.amount/100000, 2)}亿，实际换手率{round(row.turnover_rate_f,2)}%')

In [25]:
# from functools import reduce

# def str_join(series):
#     return reduce(lambda x, y: f'{x},{y}', series)
    
def merge_plate_names(lst):
    if isinstance(lst, str):
        return '+'.join(set(map(lambda c: c.strip(), lst.split(','))))
    else:
        return 'NA'

def stock_summaries(df, ts_codes, end_date, cols=None, get_auc=False, top_cons=None, review_col=None, sort_by=None, sort_ascending=True):
    ts_codes = list(map(lambda c: get_ts_code_from_name(c) if len(c) != 6 and len(c) != 9 else c, ts_codes))
    today_df = df.xs(end_date, level='trade_date', drop_level=True)
    target_df = today_df[today_df.index.isin(ts_codes)]
    if top_cons is not None:
        target_df = target_df.join(top_cons)

    if cols is None:
        cols = ['name', 'close', 'pre5_pct_chg', 'pre20_pct_chg', 'circ_mv', 'total_mv', 'vol', 'turnover_rate_f', 'amount', 'conseq_up_num', 'strth', 'first_time', 'last_time', 'fd_amount']

    if get_auc:
        for ts_code in tqdm(ts_codes):
            stock_zh_a_tick_tx_js_df = tx_auc(ts_code=ts_code)
            target_df.loc[ts_code, 'next_auc_amt'] = stock_zh_a_tick_tx_js_df.iloc[0]['成交金额'] / 1000
            target_df.loc[ts_code, 'next_open'] = stock_zh_a_tick_tx_js_df.iloc[0]['成交价格']
            target_df.loc[ts_code, 'next_open_pct'] = round((target_df.loc[ts_code, 'next_open'] / target_df.loc[ts_code, 'close'] - 1) * 100, 2)
            target_df.loc[ts_code, 'next_auc_pvol_ratio'] = (target_df.loc[ts_code, 'next_auc_amt'] / target_df.loc[ts_code, 'amount'])

    if sort_by is not None:
        target_df = target_df.sort_values(sort_by, ascending=sort_ascending)
        ts_codes = target_df.index
            
    if review_col:
        check_performance(target_df, cols=cols, y_col=review_col, display=True)
    
    print(f'Details of {end_date}:')
    for i, ts_code in enumerate(ts_codes):
        row = target_df.loc[ts_code]
        pre_stats = f'({merge_plate_names(row.plate_name)}, {int(row.conseq_up_num)}板 ({row.up_type})，流值{round(row.circ_mv/10000, 2)}亿，量比{round(row.vol_ratio,2)}，{round(row.amount/100000, 2)}亿，trf{round(row.turnover_rate_f,0)}%)'
        if review_col:   
            next_open_pct = target_df.loc[ts_code, 'next_open_pct']
            next_auc_amt = target_df.loc[ts_code, 'next_auc_amt']
            next_pct_chg =  target_df.loc[ts_code, 'next_pct_chg']
            next_auc_pvol_ratio = target_df.loc[ts_code, 'next_auc_pvol_ratio']
            
            if target_df.loc[ts_code, 'next_limit'] == 'U':
                up = f'🔺涨停[{target_df.loc[ts_code, "next_up_type"]}]'
                if target_df.loc[ts_code, 'next_up_type'] == 'Y':
                    up = up + '🚫'
            elif target_df.loc[ts_code, 'next_limit'] == 'D':
                up = '❎跌停'
            elif target_df.loc[ts_code, 'next_pct_chg'] - target_df.loc[ts_code, 'next_open_pct'] < -5:
                up = '📉巨阴'
            elif target_df.loc[ts_code, 'next_pct_chg'] - target_df.loc[ts_code, 'next_open_pct'] > 5:
                up = '📈大阳'
            else:
                up = ''
            review_stats = f'开{round(next_open_pct,2)}%，竞价成交{round(next_auc_amt/100000, 2)}亿 ({round(next_auc_pvol_ratio, 2)})，收{round(next_pct_chg,2)}%。{up}'
            print(f'{i+1}. {row["name"]} {pre_stats} 🧩=> {review_stats}')
        else:
            print(f'{i+1}. {row["name"]} {pre_stats} 🧩=> ')
    return target_df

In [26]:
def calc_top_cons(cons_today, cons):
    top_cons= (
         cons_today.reset_index()[['ts_code', 'name','plate_name']]
            .merge(cons[['upstop_num', 'pct_chg', 'p5_pct_chg']], on=['plate_name'])
            .sort_values(['pct_chg'], ascending=False)
            .groupby('name').head(3)
            .groupby('ts_code').agg({'plate_name': str_join})
    )
    return top_cons


def save_plate_sum(plate_sum, tdate, plate_type):
    '''
    Calucate Plate summary and save to database 'plates' Table.
    '''
    print(f'Calculating {len(plate_sum)} {plate_type} of {tdate}...')
    cons=plate_sum.xs(tdate, level='trade_date', drop_level=False)
    cons.loc[:, 'plate_type'] = plate_type
    cons.loc[:, 'daily_rank'] = cons.pct_chg.rank(ascending=False, axis=0).astype(int)
    cons.loc[:, 'upstop_stocks'] =  cons.upstop_stocks.map(lambda x: x.split(' ') if isinstance(x, str) else [])
    cons.loc[:, 'top_stocks'] =  cons.top_stocks.map(lambda x: x.split(' ') if isinstance(x, str) else [])
    cons.reset_index(inplace=True)
    cons.rename(columns={'plate_name': 'name'}, inplace=True)
    cons_df = cons[['name', 'trade_date', 'plate_type', 'pct_chg', 'amount', 'daily_rank', 'upstop_num', 'up', 'dn', 'fl', 'upstop_stocks', 'top_stocks']]
    insert_df(df=cons_df, tablename='plates')
    return cons_df

In [27]:
def market_summary(df, end_date):
    today_df = df.xs(end_date, level='trade_date', drop_level=True)
    today_hs = StockFilter(end_date=end_date).hs().filter(today_df)
    
    total_amt = round(today_hs.amount.sum()/100000, 2)
    
    try:
        bs_amount = round(ak.stock_hsgt_north_net_flow_in_em(symbol="北上").rename(columns={'date': 'trade_date'}).set_index('trade_date').loc[end_date].value / 10000, 2)
    except:
        bs_amount = None
    try:
        hgt_amount = round(ak.stock_hsgt_north_net_flow_in_em(symbol="沪股通").rename(columns={'date': 'trade_date'}).set_index('trade_date').loc[end_date].value / 10000, 2)
    except:
        hgt_amount = None
    try:
        sgt_amount = round(ak.stock_hsgt_north_net_flow_in_em(symbol="深股通").rename(columns={'date': 'trade_date'}).set_index('trade_date').loc[end_date].value / 10000, 2)
    except:
        sgt_amount = None
    
    median_pct_chg = round(today_hs.pct_chg.mean(),2)
    emo = read_pg(table='activities')
    emo['trade_date'] = emo.trade_date.apply(lambda x: pdl.parse(x))
    emo.set_index('trade_date', inplace=True)
    emo = emo.loc[end_date]
    
    #upstops
    today_hs = StockFilter(end_date=end_date).hs().not_st().filter(today_df)
    today_uped = today_hs[today_hs.high == today_hs.upstop_price]
    today_up = today_hs[today_hs.limit=='U']
    today_dn = today_hs[today_hs.limit=='D']
    up_fail_rate = round((len(today_uped)-len(today_up))/len(today_uped)*100,2)
    
    # trend  pre_up_pct	pre_ups_pct	p_up_t_noup_pct
    upstop_trend_df = upstop_trend(df, end_date, n_days=3)
    pre_up_pct = round(upstop_trend_df.tail(1).pre_up_pct.iat[-1], 2)
    pre_ups_pct = round(upstop_trend_df.tail(1).pre_ups_pct.iat[-1], 2)
    p_up_t_noup_pct = round(upstop_trend_df.tail(1).p_up_t_noup_pct.iat[-1], 2)
    pre_up_cons_pct = round(upstop_trend_df.loc[end_date].pre_up_cons_pct, 2)     # 昨涨停晋级率
    
    indices_df = ak_get_index().set_index('name')
    sh = indices_df.loc["上证指数"]
    sz = indices_df.loc["深证成指"]
    cyb = indices_df.loc["创业板指"]
    
    print(f'【{end_date}】')
    print(f'上证指数{round(sh.latest,1)}点，{sh.pct_chg}%, 深成指{round(sz.latest,1)}点，{sz.pct_chg}%, 创业板指{round(cyb.latest,1)}点，{cyb.pct_chg}%')
    print(f'总成交{total_amt}亿，北上总流入{bs_amount}亿（沪{hgt_amount}亿, 深{sgt_amount}亿）。涨跌比：{emo.up.astype("int")}/{emo.dn.astype("int")}，中位涨幅：{median_pct_chg}%，热度：{emo.vitality}。')
    print(f'涨跌停：{emo.real_upstop.astype("int")}/{emo.real_dnstop.astype("int")}，炸板率{up_fail_rate}%，连板高度{today_up.conseq_up_num.max()}板。昨涨停晋级率{pre_up_cons_pct}%，昨涨停平均涨幅{pre_up_pct}%，掉队股平均涨幅{p_up_t_noup_pct}%。')

In [28]:
def get_tfp(end_date):
    end_date_ak = pdl.parse(end_date).strftime('%Y%m%d')
    stock_em_tfp_df = ak.stock_tfp_em(date=end_date_ak)
    stock_em_tfp_df.set_index('代码', inplace=True)
    stock_em_tfp_df = stock_em_tfp_df[~stock_em_tfp_df.index.str.startswith('8')]
    if len(stock_em_tfp_df[stock_em_tfp_df['停牌原因']=='交易异常波动'])>0:
        display(stock_em_tfp_df[stock_em_tfp_df['停牌原因']=='交易异常波动'])
    print('=================================================================================')
    display(stock_em_tfp_df[stock_em_tfp_df['停牌原因']!='交易异常波动'])
    return stock_em_tfp_df

In [29]:
import re
def round_print(text, *numbers):
    regex = re.compile('{}')
    if len(regex.findall(text)) != len(numbers):
        raise ValueError('Number of numbers does not match the pattern')
    for num in numbers:
        text = regex.sub(str(round(num, 2)), text, 1)
    print(text)

In [31]:
norm_cols = ['name', 'close', 'circ_mv', 'turnover_rate_f', 'amount']
up_cols = ['name', 'close', 'pre5_pct_chg', 'pre20_pct_chg', 'circ_mv', 'total_mv', 'vol', 'turnover_rate_f', 'amount', 'conseq_up_num', 'strth', 'first_time', 'last_time', 'fd_amount']
UPSTOP_COLS = ['name', 'open', 'high', 'close', 'circ_mv', 'total_mv', 'vol', 'vol_type', 'vol_ratio', 'amount', 'open_pct', 'pct_chg', 'up_type', 'first_time', 'last_time', 'open_times', 'strth', 'turnover_rate_f', 'fl_ratio', 'fc_ratio']


In [21]:
# def check_performance(df, cols=UPSTOP_COLS, display=False, y_col='next_cvo'):
#     round_print(f'[Open >=0][{y_col}]' + 'Min: {}%, Max: {}%, Avg: {}%, Median: {}%', df[y_col].min(), df[y_col].max(), df[y_col].mean(), df[y_col].median())
#     df1 = df[df.next_open_pct >= 0]
#     round_print(f'[Open >=0][{y_col}]' + 'Min: {}%, Max: {}%, Avg: {}%, Median: {}%', df1[y_col].min(), df1[y_col].max(), df1[y_col].mean(), df1[y_col].median())
#     print(f'[Count] Upstop: {len(df[df.next_limit=="U"])}, Non_Y: {len(df[(df.next_limit=="U") & (df.next_up_type != "Y")])}, Total {len(df)}')
#     if display:
#         display_up_df(df[cols])

def check_performance(df, cols=UPSTOP_COLS, y_col='next_cvo', display=False):
    round_print(f'[Total][{y_col}]' + 'Min: {}%, Max: {}%, Avg: {}%, Median: {}%', df[y_col].min(), df[y_col].max(), df[y_col].mean(), df[y_col].median())
    df1 = df[df.next_open_pct >= 0]
    round_print(f'[Open >=0][{y_col}]' + 'Min: {}%, Max: {}%, Avg: {}%, Median: {}%', df1[y_col].min(), df1[y_col].max(), df1[y_col].mean(), df1[y_col].median())
    print(f'[Count] Upstop: {len(df[df.next_limit=="U"])}, Non_Y: {len(df[(df.next_limit=="U") & (df.next_up_type != "Y")])}, Total {len(df)}')
    if display:
        display_up_df(df[cols])

In [32]:
from psycopg2.errors import UniqueViolation

def get_auc_by_tick(tdate, ts_codes=None):
    print(f'Fetching auc data as of {tdate}...')
    auc = pd.DataFrame()
    if ts_codes is None:
        ts_codes = get_stock_basic(tdate).index.to_list()
    for ts_code in tqdm(ts_codes):
        ts_code = get_ts_code_from_name(ts_code) if len(ts_code) != 6 and len(ts_code) != 9 else ts_code
        symbol = add_postfix(ts_code=ts_code, type='ak')
        df = tx_auc(symbol=symbol)
        if not df.empty:
            df.columns = ['time','price','price_diff', 'vol', 'amount', 'direction']
            tmpdf = pd.DataFrame()
            tmpdf.index.name = 'ts_code'
            tmpdf.loc[ts_code, 'open'] = df.iloc[0].price
            tmpdf.loc[ts_code, 'auc_amt'] = df.iloc[0].amount/1000
            tmpdf.loc[ts_code, 'auc_vol'] = df.iloc[0].vol/10
            tmpdf.loc[ts_code, 'trade_date'] = tdate
            auc = auc.append(tmpdf)
    insert_df(auc.reset_index(), 'auction')
    return auc

                                                                                                                                                                                                            