# 台股資料庫建立

## 引入套件

In [2]:
from pandas import DataFrame, date_range, read_csv
from datetime import timedelta, datetime
import os
import time
import re
import requests
from pymongo import MongoClient

### 引入vnpy套件

In [3]:
from vnpy.trader.database import database_manager
from vnpy.trader.object import BarData
from vnpy.trader.constant import Interval, Exchange

### 更新資料到vnpy的台股資料庫(mongo)

In [4]:
def update_vnpyTSE(date: datetime, data:list, db_name='TSE'):
    """
    params:
        date [datetime] : 日期
        data [list of distionary] : 當日的所有股票交易資料
        db_name [str]: 資料庫名稱
    """
    bars = []
    for row in data:
        try:
            bar = BarData(
                symbol=row['Ticker'],
                exchange=Exchange.TSE,
                datetime=datetime.strptime(row['Date'], '%Y-%m-%d'),
                interval=Interval.DAILY,
                volume = row['Volume'],
                open_price = row['Open'],
                high_price = row['High'],
                low_price = row['Low'],
                close_price = row['Close'],
                gateway_name='DB'
                )
            bars.append(bar)
        except Exception as e:
            print(e)
            print(row)
            print(row['Date'])
    try:
        database_manager.save_bar_data(bars, db_name)
        print(f'儲存 {date} 資料成功')
    except Exception as e:
        print(f'儲存 {date} 資料失敗')
        print(e)

## 創建資料儲存路徑

In [5]:
path = os.path.dirname(os.path.abspath('__file__'))
if not os.path.isdir(path):
    path = os.getcwd()
tsepath = os.path.join(path, 'TSE')
if not os.path.isdir(tsepath):
    os.makedirs(tsepath)
otcpath = os.path.join(path, 'OTC')
if not os.path.isdir(otcpath):
    os.makedirs(otcpath)

## 建立處理資料的方法

In [1]:
header = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36', 'upgrade-insecure-requests': '1'}
def crawl_json_data(url):
    """
    爬取以json格式回傳的url資料，
    """
    try:
        res = requests.get(url, headers=header)
    except:
        if res.status_code != 200:
            time.sleep(45)
            return crawl_json_data(url)
    else:
        return res.json()

def update_data_dict(d):
    # 去除 Index資料
    if 'Index' in d:
        del d['Index']
    
    # 去除不必要的資料
    delete_k = []
    for key in d.keys():
        if re.match(r'_[0-9]*', key):
            delete_k.append(key)
    for key in delete_k:
        d.pop(key)
    return d

def changeToFloat(x):
    """
    將文字資料做轉換，補上NA
    以及去除數字較大帶有逗點的狀況
    """
    try:
        if isinstance(x, str):
            if '-' in x:
                return float('nan')
        return float(str(x).replace(',',''))
    except:
        return x

## 爬取上市股票歷史資料

In [6]:
def TSE_HistoricalPrice(i):
    output = []
    try:
        date = i.strftime('%Y%m%d')
        url = 'https://www.twse.com.tw/exchangeReport/MI_INDEX?response=json&date=' + \
            date + '&type=ALLBUT0999&_=1553846233285'
        js = crawl_json_data(url)
        try: # 2011 某日才改變資料放的位置
            data = js['data9']
        except:
            data = js['data8']
        for k in range(len(data)):
            o,h,l,c,v = [changeToFloat(x) for x in data[k][5:9] + data[k][2:3]]
            output.append(update_data_dict({
                'Date':i.strftime('%Y-%m-%d'),
                'Ticker':data[k][0],
                'Open':o,
                'High':h,
                'Low':l,
                'Close':c,
                'Volume':int(v),
            }))
    except Exception as e:
        print(e)
        print(js.keys())
    else:
        return output

## 爬取上櫃股票歷史資料

In [7]:
def OTC_HistoricalPrice(i):
    output = []
    try:
        date = '/'.join([str(i.year - 1911), i.strftime('%m/%d')])
        url = 'https://www.tpex.org.tw/web/stock/aftertrading/otc_quotes_no1430/stk_wn1430_result.php?l=zh-tw&d=' + \
            date + '&se=EW&_=1553850502415'
        js = crawl_json_data(url)
        data = js["aaData"]
        for k in range(len(data)):
            o,h,l,c,v = [changeToFloat(x) for x in data[k][4:7]+data[k][2:3]+data[k][8:9]]
            output.append(update_data_dict({
                'Date':i.strftime("%Y-%m-%d"),
                'Ticker':data[k][0],
                'Open':o,
                'High':h,
                'Low':l,
                'Close':c,
                'Volume':int(v),
            }))
    except Exception as e:
        print(e)
        print(js.keys())
    else:
        return output

## 建立資料庫

In [8]:
def Crawl_TWSE(date):
    try:
        # crawl tse daily data
        # 判斷是否曾經爬取過這一天的資料
        # 如果有的話則讀取本地資料
        if os.path.isfile(os.path.join(
                tsepath, f"{date.strftime('%Y-%m-%d')}.txt")): 
            df_tse = read_csv(os.path.join(
                tsepath, f"{date.strftime('%Y-%m-%d')}.txt"), sep='\t')
            data_tse = [update_data_dict(x) for x in list(df_tse.T.to_dict().values())]
        else: # 如果沒有的話才進入證交所爬取資料
            data_tse = TSE_HistoricalPrice(date)

        # 確認是否有取得我們要的資料
        # 有資料才進行存檔與放入資料庫的動作
        if len(data_tse) > 0:
            DataFrame(data_tse).to_csv(os.path.join(
                tsepath, f"{date.strftime('%Y-%m-%d')}.txt"), sep='\t', index=None, float_format='%g')
            update_vnpyTSE(date, data_tse, db_name)
        print(f'Update {date} Historical Price of tse success')
    except Exception as e:
        print('tse',e)
        
def Crawl_CPEX(date):
    try:
        if date >= datetime(2007,1,2): # 上櫃股票的資料比較晚才有
            # crawl otc daily data
            # 判斷是否曾經爬取過這一天的資料
            # 如果有的話則讀取本地資料
            if os.path.isfile(os.path.join(
                    otcpath, f"{date.strftime('%Y-%m-%d')}.txt")):
                df_otc = read_csv(os.path.join(
                    otcpath, f"{date.strftime('%Y-%m-%d')}.txt"), sep='\t')
                data_otc = [update_data_dict(x) for x in list(df_otc.T.to_dict().values())]
            else:
                data_otc = OTC_HistoricalPrice(date)

            # 確認是否有取得我們要的資料
            # 有資料才進行存檔與放入資料庫的動作
            if len(data_otc) > 0:
                DataFrame(data_otc).to_csv(os.path.join(
                    otcpath, f"{date.strftime('%Y-%m-%d')}.txt"), sep='\t', index=None, float_format='%g')
                update_vnpyTSE(date, data_otc, db_name)
            print(f'Update {date} Historical Price of otc success')
    except Exception as e:
        print('otc',e)

In [None]:
if __name__ == "__main__":
    try:
        schema = 'vnpy_test' # 根據在vnpy中設定的那個database名稱
        db_name = 'Test' # 可自行更換資料庫(table)名稱
        client = MongoClient('mongodb://192.168.1.110:27017') #依照vnpy中設定的database的ip與port
        schema = client['vnpy_test']
        collections = schema.list_collection_names()
        if db_name in collections: # 資料庫存在
            table = schema[db_name]
            nums = table.count()
            if not nums: # 資料庫沒有資料，所以進行完整的建立, 取資料公告的第一天
                start_date = datetime(2004,2,11) # 此日期為證交所公告最早的日期 # 2004-2-11
            else: # 資料庫有資料，取上次更新的最後一天做為起始日期
                start_date = list(table.distinct('datetime'))[-1]
        else: # 資料庫不存在, 所以進行完整的建立, 取資料公告的第一天
            start_date = datetime(2004,2,11) # 此日期為證交所公告最早的日期 # 2004-2-11
        end_date = datetime.today() # 最後一日皆設定開啟的當日
        dates = date_range(start_date, end_date)
        for date in dates:
            Crawl_TWSE(date)
            Crawl_CPEX(date)
#             time.sleep(3)
    except Exception as e:
        print(e)

  # Remove the CWD from sys.path while we load stuff.


儲存 2008-03-11 00:00:00 資料成功
Update 2008-03-11 00:00:00 Historical Price of tse success
儲存 2008-03-11 00:00:00 資料成功
Update 2008-03-11 00:00:00 Historical Price of otc success
儲存 2008-03-12 00:00:00 資料成功
Update 2008-03-12 00:00:00 Historical Price of tse success
儲存 2008-03-12 00:00:00 資料成功
Update 2008-03-12 00:00:00 Historical Price of otc success
儲存 2008-03-13 00:00:00 資料成功
Update 2008-03-13 00:00:00 Historical Price of tse success
儲存 2008-03-13 00:00:00 資料成功
Update 2008-03-13 00:00:00 Historical Price of otc success
儲存 2008-03-14 00:00:00 資料成功
Update 2008-03-14 00:00:00 Historical Price of tse success
儲存 2008-03-14 00:00:00 資料成功
Update 2008-03-14 00:00:00 Historical Price of otc success
'data8'
dict_keys(['stat', 'groups9'])
tse object of type 'NoneType' has no len()
Update 2008-03-15 00:00:00 Historical Price of otc success
'data8'
dict_keys(['stat', 'groups9'])
tse object of type 'NoneType' has no len()
Update 2008-03-16 00:00:00 Historical Price of otc success
儲存 2008-03-17 00:00:00

儲存 2008-04-30 00:00:00 資料成功
Update 2008-04-30 00:00:00 Historical Price of tse success
儲存 2008-04-30 00:00:00 資料成功
Update 2008-04-30 00:00:00 Historical Price of otc success
'data8'
dict_keys(['stat', 'groups9'])
tse object of type 'NoneType' has no len()
Update 2008-05-01 00:00:00 Historical Price of otc success
儲存 2008-05-02 00:00:00 資料成功
Update 2008-05-02 00:00:00 Historical Price of tse success
儲存 2008-05-02 00:00:00 資料成功
Update 2008-05-02 00:00:00 Historical Price of otc success
'data8'
dict_keys(['stat', 'groups9'])
tse object of type 'NoneType' has no len()
Update 2008-05-03 00:00:00 Historical Price of otc success
'data8'
dict_keys(['stat', 'groups9'])
tse object of type 'NoneType' has no len()
Update 2008-05-04 00:00:00 Historical Price of otc success
儲存 2008-05-05 00:00:00 資料成功
Update 2008-05-05 00:00:00 Historical Price of tse success
儲存 2008-05-05 00:00:00 資料成功
Update 2008-05-05 00:00:00 Historical Price of otc success
儲存 2008-05-06 00:00:00 資料成功
Update 2008-05-06 00:00:00 