In [1]:
import Importer  # Help directly import Ipython notebooks
# (Reference: https://nbviewer.jupyter.org/github/jupyter/notebook/blob/master/docs/source/examples/Notebook/Importing%20Notebooks.ipynb)
import Tushare_data_fetcher
import pymysql
import pymssql
import tushare as ts
import numpy as np
import pandas as pd
import sqlalchemy
import datetime
import json
import urllib.parse

importing Jupyter notebook from Tushare_data_fetcher.ipynb


## 0. Basic configurations

### 0.1 Local configurations

In [2]:
token = 'ce9611f48f0fe5d6fb5abe3303367254f1ff2836a0fbc2fa72e15e82'
ts.set_token(token)

In [3]:
with open('../config/config.json', 'r') as f:
    config = json.load(f)
con = pymysql.connect(**config)
cur = con.cursor()
alchemy_str = 'mysql+pymysql://' + config['user'] + ':' + config['password'] + \
    '@' + config['host'] + ':' + str(config['port']) + '/' + config['database']
alchemy_engine = sqlalchemy.create_engine(alchemy_str)

In [4]:
cur.execute("show tables")
table_list = [tuple[0] for tuple in cur.fetchall()]
print(table_list)

['asset_return', 'balance', 'cashflow', 'daily', 'est_earning', 'est_pe', 'finance', 'shibor']


### 0.2 Wind Database Configurations

In [5]:
with open('../config/wind_config.json', 'r') as f:
    config_wind = json.load(f)
conn_wind = pymssql.connect(**config_wind)
cursor_wind = conn_wind.cursor()
password_parse = urllib.parse.quote_plus(config_wind['password'])
alchemy_str_wind = 'mssql+pymssql://' + config_wind['user'] + ':' + password_parse + \
    '@' + config_wind['host'] + ':1433' + '/' + \
    config_wind['database'] + '?charset=GBK'
alchemy_engine_wind = sqlalchemy.create_engine(
    alchemy_str_wind, encoding='GBK')

## 1. Create corresponding tables (Initialization) and fill with data from 2015-01-01

**ATTENTION: The TABLE name would be automatically transformed to lower cases (i.e. Daily $\rightarrow$ daily)**

**Only saving codes for indirect Tables are kept. The creation procedures are also deleted for simplicity.**

### Table Daliy

In [None]:
daily_create = '''
    CREATE TABLE Daily(
    ts_code VARCHAR(10) NOT NULL,
    trade_date DATE NOT NULL,
    turnover_rate NUMERIC(18,4),
    turnover_rate_f NUMERIC(18,4),
    pe_ttm NUMERIC(18,4),
    pb NUMERIC(18,4),
    total_share NUMERIC(38,4),
    float_share NUMERIC(38,4),
    total_mv NUMERIC(38,4),
    circ_mv NUMERIC(38,4));
    '''

In [None]:
cur.execute(daily_create)

In [None]:
trade_cal = Tushare_data_fetcher.get_trade_calendar(start_date='20150101',
                                                    end_date='20210831')
for date in trade_cal:
    daily_df = Tushare_data_fetcher.get_daily_indicators(date=date)
    daily_df.to_sql(name='daily', con=alchemy_engine,
                    if_exists='append', index=False)
    print(f'{date}: Daily indicators saved.')

### Table Return

In [None]:
return_create = '''
    CREATE TABLE asset_return(
    ts_code VARCHAR(10) NOT NULL,
    trade_date DATE NOT NULL,
    close NUMERIC(18,4),
    pre_close NUMERIC(18,4),
    pct_chg NUMERIC(18,4),
    vol NUMERIC(38,4),
    amount NUMERIC(38,4));
    '''

In [None]:
query = '''
    SELECT Min(trade_date)
    FROM asset_return;
    '''
q_df = pd.read_sql_query(query, con=alchemy_engine)

In [None]:
q_df.iloc[0, 0]

### Table Shibor

In [None]:
shibor_create = '''
    CREATE TABLE shibor(
    date DATE NOT NULL,
    `on` NUMERIC(18,4),
    1w NUMERIC(18,4),
    2w NUMERIC(18,4),
    1m NUMERIC(18,4),
    3m NUMERIC(18,4),
    6m NUMERIC(18,4),
    9m NUMERIC(18,4),
    1y NUMERIC(18,4));
    '''
# 在MySQL中，为了显式区分MySQL的关键字与普通字符，需要使用一个反引号``

### Table Finance

In [None]:
finance_create = '''
    CREATE TABLE Finance(
    ts_code VARCHAR(10) NOT NULL,
    ann_date DATE,
    f_ann_date DATE,
    end_date DATE NOT NULL,
    report_type INTEGER,
    revenue NUMERIC(38,4),
    n_income_attr_p NUMERIC(38,4));
    '''

In [None]:
for year in range(2015, 2021):
    period = str(year) + '1231'
    finance_df = Tushare_data_fetcher.get_finance_indicators(period=period)
    if (finance_df.empty == False):
        finance_df.to_sql(name='finance',
                          con=alchemy_engine,
                          if_exists='append',
                          index=False)
        print(f'{year}:Done!')

### Table Cashflow

In [None]:
cashflow_create = '''
    CREATE TABLE Cashflow(
    ts_code VARCHAR(10) NOT NULL,
    ann_date DATE,
    f_ann_date DATE,
    end_date DATE NOT NULL,
    report_type INTEGER,
    n_cashflow_act NUMERIC(38,4),
    im_net_cashflow_oper_act NUMERIC(38,4));
    '''

In [None]:
for year in range(2015, 2021):
    period = str(year) + '1231'
    cashflow_df = Tushare_data_fetcher.get_cashflow_indicators(period=period)
    if (cashflow_df.empty == False):
        cashflow_df.to_sql(name='cashflow',
                           con=alchemy_engine,
                           if_exists='append',
                           index=False)
        print(f'{year}:Done!')

### Table Balance

In [None]:
balance_create = '''
    CREATE TABLE Balance(
    ts_code VARCHAR(10) NOT NULL,
    ann_date DATE,
    f_ann_date DATE,
    end_date DATE NOT NULL,
    report_type INTEGER,
    total_share NUMERIC(38,4),
    total_assets NUMERIC(38,4),
    total_cur_liab NUMERIC(38,4),
    total_liab NUMERIC(38,4),
    oth_eqt_tools_p_shr NUMERIC(38,4));
    '''

In [None]:
for year in range(2015, 2021):
    for suffix in ['0331', '0630', '0930', '1231']:
        period = str(year) + suffix
        balance_df = Tushare_data_fetcher.get_balance_indicators(period=period)
        if (balance_df.empty == False):
            balance_df.to_sql(name='balance',
                              con=alchemy_engine,
                              if_exists='append',
                              index=False)
            print(f'{period}:Done!')

### Table EST_PE

In [20]:
EST_PE_create = '''
    CREATE TABLE est_pe(
    wind_code VARCHAR(10) NOT NULL,
    est_date DATE,
    rolling_type VARCHAR(10),
    est_pe NUMERIC(38,4));
    '''
cur.execute(EST_PE_create)

0

In [22]:
EST_PE_query = '''
                SELECT S_INFO_WINDCODE AS wind_code, EST_DT AS est_date,
                    ROLLING_TYPE AS rolling_type, EST_PE as est_pe
                FROM ASHARECONSENSUSROLLINGDATA
                WHERE EST_DT > '20210801'
                AND (ROLLING_TYPE = 'FTTM' OR ROLLING_TYPE = 'FY1');
            '''
EST_PE_df = pd.read_sql(EST_PE_query, con=alchemy_engine_wind)

### Table EST_Earning

In [6]:
EST_EARNING_create = '''
    CREATE TABLE est_earning(
    wind_code VARCHAR(10) NOT NULL,
    est_date DATE,
    rolling_type VARCHAR(10),
    est_earning NUMERIC(38,4));
    '''
cur.execute(EST_EARNING_create)

OperationalError: (1050, "Table 'est_earning' already exists")

In [7]:
Earning_Growth_EST_query = '''
                        SELECT S_INFO_WINDCODE AS wind_code , EST_DT AS est_date,
                        ROLLING_TYPE AS rolling_type, NET_PROFIT AS est_earning
                        FROM ASHARECONSENSUSROLLINGDATA
                        WHERE EST_DT > '20210801'
                        AND (ROLLING_TYPE = 'YOY' OR ROLLING_TYPE = 'YOY2')
                        '''
EST_Earning_df = pd.read_sql(Earning_Growth_EST_query, con=alchemy_engine_wind)

## 2. Class DataSaver

In [6]:
class DataSaver:
    def __init__(self, token, market_port='000985.CSI'):
        # Set market portfolio
        self.market_port = market_port

        # Tushare
        self.token = token
        ts.set_token(self.token)

        # Local database
        with open('../config/config.json', 'r') as f:
            config = json.load(f)
        con = pymysql.connect(**config)
        cur = con.cursor()
        alchemy_str = 'mysql+pymysql://' + config['user'] + ':' + config['password'] + \
            '@' + config['host'] + ':' + \
            str(config['port']) + '/' + config['database']
        self.alchemy_engine = sqlalchemy.create_engine(alchemy_str)

        # Wind database
        with open('../config/wind_config.json', 'r') as f:
            config_wind = json.load(f)
        conn_wind = pymssql.connect(**config_wind)
        cursor_wind = conn_wind.cursor()
        password_parse = urllib.parse.quote_plus(config_wind['password'])
        alchemy_str_wind = 'mssql+pymssql://' + config_wind['user'] + ':' + password_parse + \
            '@' + config_wind['host'] + ':1433' + '/' + \
            config_wind['database'] + '?charset=GBK'
        self.alchemy_engine_wind = sqlalchemy.create_engine(
            alchemy_str_wind, encoding='GBK')
        return

    def set_token(self, token):
        self.token = token
        ts.set_token(self.token)
        return

    def set_market_portfolio(self, mar_port):
        self.market_port = mar_port
        return

    def _save_daily_indicators(self, start_date, end_date):  # 测试通过
        trade_cal = Tushare_data_fetcher.get_trade_calendar(
            start_date=start_date, end_date=end_date)
        for date in trade_cal:
            daily_df = Tushare_data_fetcher.get_daily_indicators(date=date)
            if (daily_df.empty == False):
                daily_df.to_sql(name='daily',
                                con=self.alchemy_engine,
                                if_exists='append',
                                index=False)
        return daily_df.empty == False

    def _save_stock_return(self, start_date, end_date):  # 测试通过
        if (start_date != end_date):
            stock_return_df = Tushare_data_fetcher.get_interval_stock_return(
                start_date=start_date, end_date=end_date, token=self.token)
        else:
            stock_return_df = Tushare_data_fetcher.get_daily_stock_return(
                start_date)
        if (stock_return_df.empty == False):
            stock_return_df.to_sql(name='asset_return',
                                   con=self.alchemy_engine,
                                   if_exists='append',
                                   index=False)
        return stock_return_df.empty == False

    def _save_index_return(self, start_date, end_date):  # 测试通过
        index_return_df = Tushare_data_fetcher.get_interval_market_portfolio_return(
            start_date=start_date, end_date=end_date, ts_code=self.market_port)
        if (index_return_df.empty == False):
            index_return_df.to_sql(name='asset_return',
                                   con=self.alchemy_engine,
                                   if_exists='append',
                                   index=False)
        return index_return_df.empty == False

    def _save_shibor(self, start_date, end_date):  # 测试通过
        shibor_df = Tushare_data_fetcher.get_interval_shibor(
            start_date=start_date, end_date=end_date)
        if (shibor_df.empty == False):
            shibor_df.to_sql(name='shibor',
                             con=self.alchemy_engine,
                             if_exists='append',
                             index=False)
        return shibor_df.empty == False

    def _save_finance(self, start_date, end_date):  # 测试通过
        start_year = start_date[:4]
        end_year = end_date[:4]
        for year in range(int(start_year), int(end_year) + 1):
            period = str(year) + '1231'
            finance_df = Tushare_data_fetcher.get_finance_indicators(
                period=period)
            if (finance_df.empty == False):
                finance_df.to_sql(name='finance',
                                  con=self.alchemy_engine,
                                  if_exists='append',
                                  index=False)
        return

    def _save_cashflow(self, start_date, end_date):  # 测试通过
        start_year = start_date[:4]
        end_year = end_date[:4]
        for year in range(int(start_year), int(end_year) + 1):
            period = str(year) + '1231'
            cashflow_df = Tushare_data_fetcher.get_cashflow_indicators(
                period=period)
            if (cashflow_df.empty == False):
                cashflow_df.to_sql(name='cashflow',
                                   con=self.alchemy_engine,
                                   if_exists='append',
                                   index=False)
        return

    def _save_balance(self,
                      start_date,
                      end_date,
                      report_date=['0331', '0630', '0930', '1231']):  # 测试通过
        start_year = start_date[:4]
        end_year = end_date[:4]
        for year in range(int(start_year), int(end_year) + 1):
            for suffix in report_date:
                period = str(year) + suffix
                balance_df = Tushare_data_fetcher.get_balance_indicators(
                    period=period)
                if (balance_df.empty == False):
                    balance_df.to_sql(name='balance',
                                      con=self.alchemy_engine,
                                      if_exists='append',
                                      index=False)
        return

    def _save_est_pe(self, start_date, end_date):
        EST_PE_query = f'''
                SELECT S_INFO_WINDCODE AS wind_code, EST_DT AS est_date,
                    ROLLING_TYPE AS rolling_type, EST_PE as est_pe
                FROM ASHARECONSENSUSROLLINGDATA
                WHERE EST_DT BETWEEN \'{start_date}\' AND \'{end_date }\'
                AND (ROLLING_TYPE = 'FTTM' OR ROLLING_TYPE = 'FY1');
            '''
        EST_PE_df = pd.read_sql(EST_PE_query, con=self.alchemy_engine_wind)
        EST_PE_df.to_sql(name='est_pe', con=self.alchemy_engine,
                         if_exists='append', index=False)
        return EST_PE_df.emptyty == False

    def _save_est_earning(self, start_date, end_date):
        Earning_Growth_EST_query = f'''
                            SELECT S_INFO_WINDCODE AS wind_code , EST_DT AS est_date,
                            ROLLING_TYPE AS rolling_type, NET_PROFIT AS est_earning
                            FROM ASHARECONSENSUSROLLINGDATA
                            WHERE EST_DT BETWEEN \'{start_date}\' AND \'{end_date }\'
                            AND (ROLLING_TYPE = 'YOY' OR ROLLING_TYPE = 'YOY2')
                            '''
        EST_Earning_df = pd.read_sql(
            Earning_Growth_EST_query, con=self.alchemy_engine_wind)
        EST_Earning_df.to_sql(name='est_earning', con=self.alchemy_engine,
                              if_exists='append', index=False)
        return EST_Earning_df.empty == False

    def _save_all_daily_data(self, start_date, end_date, test_mode=False):  # 可推测测试通过
        print(f'Effective date: {start_date} - {end_date}')
        if(test_mode == False):  # 由于函数可正常运行，测试阶段仅进行实际区间输出
            if (self._save_daily_indicators(start_date, end_date)):
                print('Daily saved successfully!')
            if (self._save_stock_return(start_date, end_date)):
                print('Stock return saved successfully!')
            if (self._save_index_return(start_date, end_date)):
                print('Index return saved successfully!')
            if (self._save_shibor(start_date, end_date)):
                print('Shibor saved successfully!')
            if (self._save_est_pe(start_date, end_date)):
                print('EST_PE saved successfully!')
            if (self._save_est_earning(start_date, end_date)):
                print('EST_EARNING saved successfully!')
        return

    def save_all_quarter_data(self, start_date,
                              end_date):  # ATTENTION: 不保证覆盖问题，应当自行检查日期
        # 可推测测试通过
        self._save_finance(start_date, end_date)
        print('Finance saved successfully!')
        self._save_cashflow(start_date, end_date)
        print('Cashflow saved successfully!')
        self._save_balance(start_date, end_date)
        print('Balance saved successfully!')
        return

    def save_all_daily_data(self, start_date, end_date, test_mode=False):
        # 日期区间测试通过，据此可以推测函数功能通过测试
        start_query = '''
            SELECT MIN(trade_date)
            FROM asset_return;
            '''
        start_query_df = pd.read_sql_query(start_query,
                                           con=self.alchemy_engine)
        cur_start_date = start_query_df.iloc[0, 0].strftime('%Y%m%d')

        end_query = '''
            SELECT MAX(trade_date)
            FROM asset_return;
            '''
        end_query_df = pd.read_sql_query(end_query, con=self.alchemy_engine)
        cur_end_date = end_query_df.iloc[0, 0].strftime('%Y%m%d')

        if (start_date < cur_start_date):
            if (end_date < cur_start_date):
                self._save_all_daily_data(start_date, end_date, test_mode)
            else:
                temp_end_datetime = start_query_df.iloc[
                    0, 0] - datetime.timedelta(days=1)
                temp_end_date = temp_end_datetime.strftime('%Y%m%d')
                self._save_all_daily_data(start_date, temp_end_date, test_mode)

        if (end_date > cur_end_date):
            if (start_date > cur_end_date):
                self._save_all_daily_data(start_date, end_date, test_mode)
            else:
                temp_start_datetime = end_query_df.iloc[
                    0, 0] + datetime.timedelta(days=1)
                temp_start_date = temp_start_datetime.strftime('%Y%m%d')
                self._save_all_daily_data(temp_start_date, end_date, test_mode)
        return

    def update_daily_to_date(self, test_mode=False):  # 通过测试
        end_query = '''
            SELECT MAX(trade_date)
            FROM asset_return;
            '''
        end_query_df = pd.read_sql_query(end_query, con=self.alchemy_engine)
        temp_start_datetime = end_query_df.iloc[0,
                                                0] + datetime.timedelta(days=1)
        temp_start_date = temp_start_datetime.strftime('%Y%m%d')

        yesterday_datetime = datetime.datetime.now() - datetime.timedelta(
            days=1)
        yesterday_date = yesterday_datetime.strftime('%Y%m%d')
        if (temp_start_date <= yesterday_date):
            self.save_all_daily_data(
                temp_start_date, yesterday_date, test_mode)
            print(f'Update to {yesterday_date}!')

    def update_daily(self):  # 可推测通过测试
        yesterday_datetime = datetime.datetime.now() - datetime.timedelta(
            days=1)
        yesterday_date = yesterday_datetime.strftime('%Y%m%d')
        self.save_all_daily_data(yesterday_date, yesterday_date)
        print(f'{yesterday_date}: Regular update finished!')

    def update_finance(self, year):  # 手动更新，可推测通过测试
        period = str(year) + '1231'
        self._save_finance(period, period)

    def update_cashflow(self, year):  # 手动更新，可推测通过测试
        period = str(year) + '1231'
        self._save_cashflow(period, period)

    def update_balance(self,
                       year,
                       report_date=['0331', '0630', '0930', '1231']):
        # 手动更新，可推测通过测试
        dummy_date = str(year) + '1231'
        self._save_balance(dummy_date, dummy_date, report_date)

In [7]:
Saver = DataSaver(token)

In [None]:
Saver._save_est_pe(start_date='20210701', end_date='20210731')

In [None]:
Saver._save_est_earning(start_date='20210701', end_date='20210731')

In [None]:
print('Done')