In [None]:
import gc
import subprocess
import pandas as pd
import numpy as np
import datetime
from tqdm import tqdm

In [None]:
today = datetime.datetime.today()
start = '20150101'
end = '20230101'
all_stock = all_instruments(type='CS', market='cn')
all_stock = all_stock.order_book_id.to_list()
comprehesive_index = ['000300.XSHG', '000001.XSHG', '000016.XSHG']
citics_index = [f'CI005{str(i).zfill(3)}.INDX' for i in range(1, 31)]
trade_dates = get_trading_dates(start_date='20100101', end_date=today + datetime.timedelta(days=10000))
trade_dates = pd.DataFrame(pd.to_datetime(trade_dates), columns=['trade_date'])

## Index Weights

In [None]:
weights = []
comprehesive_index = ['000016.XSHG', '000001.XSHG', '000300.XSHG']
for index in tqdm(comprehesive_index):
    w = index_weights(index, start_date=start, end_date=end)
    w.index = pd.MultiIndex.from_arrays([w.index.get_level_values(0), 
        [index] * len(w), w.index.get_level_values(1)],
        names=['date', 'index_id', 'order_book_id'])
    weights.append(w)
weights = pd.concat(weights)
weights

In [None]:
weights.to_parquet('index-weights.parquet')

## All Instruments Infomation

In [None]:
instruments = all_instruments()
instruments.listed_date = pd.to_datetime(instruments.listed_date, errors='coerce')
instruments.de_listed_date = pd.to_datetime(instruments.de_listed_date, errors='coerce')
instruments = instruments.set_index('order_book_id')

In [None]:
instruments.to_parquet('instruments.par')

## Minute Data

In [None]:
ms = pd.date_range(start, end, freq='MS')
me = pd.date_range(start, end, freq='M')
for i, (s, e) in tqdm(enumerate(zip(ms, me))):
    price_data = get_price(order_book_ids=all_stock, start_date=s, 
              end_date=e, frequency='1m', adjust_type='none').drop('num_trades', axis=1)
    price_data['adjfactor'] =  get_price(order_book_ids=all_stock, start_date=s, 
              end_date=e, frequency='1m', fields='close', adjust_type='post')['close'] / price_data['close']
    price_data = price_data.astype('float32')
    price_data.to_parquet(f'{s.strftime("%Y%m")}.parquet')
    del price_data
    gc.collect()
    if (i + 1) % 12 == 0:
        subprocess.run(['tar', '-cvzf', f'{e.year}.tar.gz'] + [f'{e.year}{str(mon).zfill(2)}.parquet' for mon in range(1, 13)])
        subprocess.run(['rm', '-rf'] + [f'{e.year}{str(mon).zfill(2)}.parquet' for mon in range(1, 13)])

## Market Daily

In [None]:
ms = pd.date_range(start, (end), freq='MS')
me = pd.date_range(start, end, freq='M')
for s, e in tqdm(list(zip(ms, me))):
    prices = get_price(order_book_ids=all_stock, start_date=s, end_date=e, fields=None, adjust_type='none')
    post_prices = get_price(order_book_ids=all_stock, start_date=s, end_date=e, fields='close', adjust_type='post')
    prices['adjfactor'] = post_prices['close'] / prices['close']
    prices = prices.swaplevel().sort_index().rename(columns={"total_turnover": "amount"})
    prices['pct_change'] = prices['close'] / prices['prev_close'] - 1
    prices = prices.astype('float32').astype({"num_trades": "uint16"})
    prices.to_parquet(f'{s.strftime("%Y%m")}.parquet', compression='gzip')

## Index Market Daily

In [None]:
index_price = get_price(order_book_ids=citics_index + comprehesive_index, start_date=get_previous_trading_date(start), end_date=end)
index_price = index_price.swaplevel()
index_price = index_price.sort_index()
index_price['pct_change'] = index_price['close'].groupby(level=1).pct_change() * 100
index_price['pct_amount'] = index_price['close'].groupby(level=1).diff()
index_price = index_price[start:]

In [None]:
index_price.to_parquet('index-market-daily.parquet')

## Industry Information

In [None]:
industry_mapping = []
for date in tqdm(trade_dates[(trade_dates.trade_date >= start) & (trade_dates.trade_date <= end)].trade_date):
    mapping = get_instrument_industry(order_book_ids=all_stock, date=date, source='citics_2019', level=0)
    mapping.index = pd.MultiIndex.from_arrays([[date] * len(mapping), mapping.index], names=['date', mapping.index.name])
    mapping['source'] = 'citics'
    industry_mapping.append(mapping)
industry_mapping = pd.concat(industry_mapping)
industry_mapping

In [None]:
industry_mapping.to_parquet('plate_info.par')

## Derivative Indicator

In [None]:
turnover = []
for stock in tqdm(all_stock):
    turnover.append(get_turnover_rate(stock, start, end, fields=['today', 'week', 'month', 'year']))
turnover = pd.concat(turnover, axis=0)

In [None]:
turnover = turnover.swaplevel().sort_index()
turnover = turnover.astype('float32')
turnover.to_parquet('turnover.parquet')

In [None]:
turnover = get_turnover_rate(all_stock, start, end, fields='today')
turnover = turnover.swaplevel()
turnover = turnover.sort_index()
turnover.columns = ['turnover']

In [None]:
factors = ['pb_ratio_ttm', 'market_cap_2']
factor = get_factor(order_book_ids=all_stock, factor=factors, start_date=start, end_date=end)
factor = factor.swaplevel()
factor = factor.sort_index()
factor = pd.concat([factor, turnover], axis=1)
factor.index.names = ['date', 'order_book_id']
factor

## HS Connect Information

In [None]:
hsg_holding = get_stock_connect(all_stock, start_date='20100101', end_date='20220810')
hsg_holding = hsg_holding.astype('float32')
hsg_holding.to_parquet('hsg_holding.parquet')

## PIT Data

In [None]:
data = get_factor(order_book_ids=all_stock, factor=["policy_dividend_payout"], start_date=start, end_date=end)
sig = data.squeeze().unstack(level=0).diff()
sig = sig.fillna(0).astype('bool')
dered = data.squeeze().unstack(level=0).mask(~sig, np.nan)
dered = dered.stack()
t = dered.loc[:,"002423.XSHE"].index

## Transform Data into Local Form

In [6]:
import sys
import importlib
import pandas as pd
from pathlib import Path
from joblib import Parallel, delayed

sys.path.append('..')
oxygene = importlib.import_module('oxygene')

In [7]:
def transform(path, inst_col=1, date_col=0, uri='../asset/kline_daily'):
    oxygene.Transformer(
        data = pd.read_parquet(path),
        inst_col = inst_col,
        date_col = date_col,
        uri = uri,
    ).transform()

In [None]:
data_dir = Path('../asset/backups/kline_min/')
Parallel(n_jobs=-1, backend='threading')(
    delayed(transform)(p) for p in data_dir.iterdir());