In [None]:
import gc
import subprocess
import pandas as pd
import numpy as np
import datetime
from tqdm.auto import tqdm

In [None]:
with open('.last_date', 'r') as f:
    start = (pd.to_datetime(f.read()) + pd.Timedelta(days=1)).strftime(r'%Y%m%d')
stop = datetime.datetime.today().strftime('%Y%m%d')

In [None]:
today = datetime.datetime.today()
all_stock = all_instruments(type='CS', market='cn')
all_stock = all_stock.order_book_id.to_list()
comprehesive_index = [
    '000905.XSHG', '000016.XSHG', '000001.XSHG', '000300.XSHG', 
    '000009.XSHG', '000010.XSHG', '399012.XSHE', '399106.XSHE', 
    '399001.XSHE', '399300.XSHE', '399102.XSHE', '000985.XSHG',
    '399101.XSHE', 
]
citics_index = [f'CI005{str(i).zfill(3)}.INDX' for i in range(1, 31)]
trade_dates = get_trading_dates(start_date=start, end_date=today)
trade_dates = pd.DataFrame(pd.to_datetime(trade_dates), columns=['trade_date'])

## Index Weights

In [None]:
weights = []
for index in tqdm(comprehesive_index):
    w = index_weights(index, start_date=start, end_date=stop)
    w = w.sort_index()
    w.columns = [index]
    weights.append(w)
weights = pd.concat(weights, axis=1).swaplevel().sort_index()
weights.to_parquet(f'index-weights_{start}-{stop}.parquet')

## All Instruments Infomation

In [None]:
instruments = all_instruments()
instruments.listed_date = pd.to_datetime(instruments.listed_date, errors='coerce')
instruments.de_listed_date = pd.to_datetime(instruments.de_listed_date, errors='coerce')
instruments = instruments.set_index('order_book_id')

In [None]:
instruments.to_parquet('instruments-info.parquet')

## Market Daily

In [None]:
def get_price_data(code, start, stop, frequency='1d'):
    data = get_price(
        order_book_ids=code, 
        start_date=start, 
        end_date=stop, 
        adjust_type='none',
        frequency=frequency,
    )
    if data is None:
        return None
    
    if frequency == '1m':
        data = data.rename({"amount": "total_turnover"}, axis=1).drop('num_trades', axis=1)
        return data.astype('float32')
    
    post_close = get_price(
        order_book_ids=code, 
        start_date=start, 
        end_date=stop, 
        fields=['close'], 
        adjust_type='post',
    )['close']
    data['adjfactor'] = post_close / data['close']
    data = data.rename({"total_turnover": "amount"}, axis=1)
    data = pd.concat([
        data, get_shares(code, start_date=start, end_date=stop)
    ], axis=1, join='inner').rename(columns={"total": "total_shares"})
    data = pd.concat([
        data, get_turnover_rate(
            order_book_ids=code, 
            start_date=start, 
            end_date=stop, 
            fields='today'
    )], axis=1, join='inner').rename(columns={"today": "turnover"})
    
    # process suspended and st information
    suspended = is_suspended(order_book_ids=code, start_date=start, end_date=stop).stack().swaplevel()
    suspended.name = 'suspended'
    st = is_st_stock(order_book_ids=code, start_date=start, end_date=stop).stack().swaplevel()
    st.name = 'st'
    data = pd.concat([data, st, suspended], axis=1, join='inner')
    
    types = set(data.columns.to_list()) - {"st", "suspended"}
    types = {col: "float32" for col in types}
    types["st"] = "bool"
    types["suspended"] = "bool"
    data.index.names = ['order_book_id', 'date']
    return data.astype(types)

In [None]:
# data = []
# for stock in tqdm(all_stock + comprehesive_index):
#     data.append(get_data(stock, start, stop))
# pd.concat(data).to_parquet(f'quotes-day_{start}-{stop}.parquet')

In [None]:
get_price_data(all_stock, start, stop).to_parquet(f'quotes-day_{start}-{stop}.parquet')

In [None]:
get_price(comprehesive_index, start, stop).to_parquet(f'index-quotes-day_{start}-{stop}.parquet')

## Minute Data

In [None]:
get_price_data(all_stock, start, stop, '1m').to_parquet(f"quotes-min_{start}-{stop}.parquet")

In [None]:
get_price_data(comprehesive_index, start, stop, '1m').to_parquet(f"index-quotes-min_{start}-{stop}.parquet")

In [None]:
# ms = pd.date_range(start, stop, freq='MS')
# me = pd.date_range(start, stop, freq='M')
# for i, (s, e) in tqdm(enumerate(zip(ms, me))):
#     price_data = get_price(order_book_ids=all_stock + comprehesive_index, start_date=s, 
#               end_date=e, frequency='1m', adjust_type='none').drop('num_trades', axis=1)
#     price_data['adjfactor'] =  get_price(order_book_ids=all_stock + comprehesive_index, start_date=s, 
#               end_date=e, frequency='1m', fields='close', adjust_type='post')['close'] / price_data['close']
#     price_data = price_data.astype('float32')
#     price_data.to_parquet(f'{s.strftime("%Y%m")}.parquet')
#     del price_data
#     gc.collect()
#     if (i + 1) % 12 == 0:
#         subprocess.run(['tar', '-cvzf', f'{e.year}.tar.gz'] + [f'{e.year}{str(mon).zfill(2)}.parquet' for mon in range(1, 13)])
#         subprocess.run(['rm', '-rf'] + [f'{e.year}{str(mon).zfill(2)}.parquet' for mon in range(1, 13)])

## Financial Data

In [None]:
def diff(x: pd.DataFrame, keep_first=False):
    res = x.diff()
    if keep_first:
        res.iloc[0] = x.iloc[0]
    return res

def get_financial_data(code, start, stop, keep_first=False):
    day_before_start = trading_date_offset(start, -1)
    df = get_factor(order_book_ids=code, factor=all_field, start_date=day_before_start, end_date=stop)
    idx = df.groupby(level=0).apply(lambda x: diff(x, keep_first)).replace(0, np.nan).dropna(axis=0, how='all').index
    df = df.loc[idx]
    df.columns = df.columns.str.slice(0, -6)
    return df.sort_index().astype('float32')

In [None]:
fields = pd.read_html("https://www.ricequant.com/doc/rqdata/python/fundamentals-dictionary.html#%E5%9F%BA%E7%A1%80%E8%B4%A2%E5%8A%A1%E6%95%B0%E6%8D%AE")
income_sheet_field = fields[3]["字段"]
balance_sheet_field = fields[4]["字段"]
cashflow_sheet_field = fields[5]["字段"]
all_field = (income_sheet_field + "_ttm_0").to_list() + \
            (balance_sheet_field + "_ttm_0").to_list() + \
            (cashflow_sheet_field + "_ttm_0").to_list()

In [None]:
# from joblib import Parallel, delayed
# data = Parallel(n_jobs=-1, backend='loky')(delayed(get_financial_data)
#     (code, "20000105", "20231231", True) for code in tqdm(all_stock)
# )
# pd.concat(data).to_parquet(f'financial_{start}-{stop}.parquet')

In [None]:
financial_data = get_financial_data(all_stock, start, stop)
if financial_data is not None:
    financial_data.to_parquet(f'financial_{start}-{stop}.parquet')

## Industry Information

In [None]:
industry_mapping = []
for date in tqdm(trade_dates.trade_date):
    mapping = get_instrument_industry(order_book_ids=all_stock, date=date, source='citics_2019', level=0)
    if mapping is not None:
        mapping.index = pd.MultiIndex.from_arrays([[date] * len(mapping), mapping.index], names=['date', mapping.index.name])
        mapping['source'] = 'citics'
        industry_mapping.append(mapping)
industry_mapping = pd.concat(industry_mapping).swaplevel().sort_index()
industry_mapping.to_parquet(f'industry-info_{start}-{stop}.parquet')

## Dividend and Split

In [None]:
def dividend_split(code, start, stop):
    if isinstance(code, str):
        code = [code]
    
    # processing declare, dividend and split information
    divinfo = get_dividend(code, start_date=start, end_date=stop, market='cn')
    if divinfo is not None:
        divinfo = divinfo.reset_index().set_index(['order_book_id', 'ex_dividend_date']).drop(
            ['advance_date', 'quarter', 'payable_date', 'book_closure_date', 'declaration_announcement_date']
        , axis=1)
        divinfo["dividend_factor"] = divinfo['dividend_cash_before_tax'] / divinfo["round_lot"]
        divinfo = divinfo.drop(['dividend_cash_before_tax', 'round_lot'], axis=1)
        divinfo = pd.DataFrame(divinfo.groupby(level=divinfo.index.names).sum().values,
                index=divinfo.index[~divinfo.index.duplicated(keep='first')], columns=["divfactor"])
    else:
        divinfo = None

    splitinfo = get_split(all_stock, start_date=start, end_date=stop, market='cn')
    if splitinfo is not None:
        splitinfo['splitfactor'] = splitinfo['split_coefficient_to'] / splitinfo['split_coefficient_from'] - 1
        splitinfo = splitinfo.drop(['split_coefficient_to', 'split_coefficient_from', 'cum_factor', 'book_closure_date', 'payable_date'], axis=1)
        splitinfo = splitinfo.loc[~splitinfo.index.duplicated(keep='first')]
    else:
        splitinfo = None
    
    if not (splitinfo is None and divinfo is None):
        spdiv = pd.concat([splitinfo, divinfo], axis=1)
        spdiv.index.names = ['order_book_id', 'date']
        return spdiv

In [None]:
df = dividend_split(all_stock, start, stop)
if df is not None:
    df.to_parquet(f'dividend-split_{start}-{stop}.parquet')

## Security Margin

In [None]:
# data = []
# for stock in tqdm(all_stock):
#     df = get_securities_margin(stock, start_date=start, end_date=stop)
#     if df is not None:
#         data.append(df.astype('float32'))
# pd.concat(data).to_parquet(f'security-margin_{start}-{stop}.parquet')

In [None]:
df = get_securities_margin(all_stock, start_date=start, end_date=stop)
if df is not None:
    df.to_parquet(f'security-margin_{start}-{stop}.parquet')

## Stock Connect

In [None]:
# data = []
# for stock in tqdm(all_stock):
#     df = get_stock_connect(stock, start_date=start, end_date=stop)
#     if df is not None:
#         data.append(df.astype('float32'))
# pd.concat(data).to_parquet(f'stock-connect_{start}-{stop}.parquet')

In [None]:
df = get_stock_connect(all_stock, start_date=start, end_date=stop)
if df is not None:
    df.index.names = ["order_book_id", "date"]
    df.to_parquet(f'stock-connect_{start}-{stop}.parquet')

In [None]:
import subprocess
from pathlib import Path
data_files = list(Path('.').glob('*.parquet'))
subprocess.run(["tar", "-czvf", f"data_{start}-{stop}.tar.gz",] + data_files)
for file in data_files:
    file.unlink()

In [None]:
# if datetime.datetime.today().strftime("%H%M") < "1500":
#     stop = (pd.to_datetime(stop) - pd.Timedelta(days=1)).strftime("%Y%m%d")
with open('.last_date', 'w') as f:
    f.write(stop)