# 讀入台指期資料到Sqlite DB

##  1 永豐金證券Api

申請永豐金證券帳號及API帳號 https://www.sinotrade.com.tw/ec/20191125/Main/index.aspx

### 登入 shioaji

In [None]:
import shioaji as sj

#登入帳戶   #透入輸入取得
api_key=input("api_key:")
secret_key=input("secret_key:")

# 登入帳戶  
# api = sj.Shioaji()
api = sj.Shioaji(simulation=True) # 模擬模式
api.login(
    api_key,secret_key
)

### 登出shioaji

In [None]:
#不再下載資料時，再執行
#api.logout()

## 2. 下載標的(contract)

In [2]:
print(api.Contracts.Futures["TXFR1"]) 
print(api.Contracts.Futures["MXFR1"])

code='TXFR1' symbol='TXFR1' name='臺股期貨近月' category='TXF' delivery_month='202308' delivery_date='2023/08/16' underlying_kind='I' unit=1 limit_up=18537.0 limit_down=15167.0 reference=16852.0 update_date='2023/08/09' target_code='TXFH3'
code='MXFR1' symbol='MXFR1' name='小型臺指近月' category='MXF' delivery_month='202308' delivery_date='2023/08/16' underlying_kind='I' unit=1 limit_up=18537.0 limit_down=15167.0 reference=16852.0 update_date='2023/08/09' target_code='MXFH3'


###  函數：呼叫 shioaji api取得kbar

In [None]:
import sqlite3
import pandas as pd
from time import sleep
import datetime   # 新增這行

##API相關
def get_kbar(api, contract, start, end, timeout=100000):
    if timeout > 0:
        print(f'kbars = api.kbars({contract}, start={start}, end={end}, timeout={timeout})')
        kbars = api.kbars(contract, start=start, end=end, timeout=timeout)
    else:
        print(f'kbars = api.kbars({contract}, start={start}, end={end})')
        kbars = api.kbars(contract, start=start, end=end)

    df = pd.DataFrame({**kbars}) 
    df.index = pd.to_datetime(df.ts)
    df = df.drop(columns="ts")
    df.drop(df.tail(1).index, inplace=True)
    
    sleep(10)
    return df

### 函數：判斷資料庫的資料日期，並決定api需要取得的日期區間

In [None]:
from datetime import date
from datetime import timedelta

def backFillKbars(api, contractObj, contractName, start_date=None):
    today = date.today()
    
    # 如果沒有提供開始日期，就默認為兩年前
    if start_date is None:
        start_date = today - timedelta(days=2*365)
    print(f'目標處理日期: {start_date} - {today}')
    
    #開始讀取資料庫現有資料    
    dbName = input("你的DB路徑加檔名:")
    print(dbName)
    
    tablename = contractName
    conn = sqlite3.connect(dbName)
    #如果沒有就建立
    sql = """CREATE TABLE IF NOT EXISTS {}(
    "ts" timestamp,
    "Volume" integer,
    "Close" real,
    "Amount" real,
    "Open" real,
    "High" real,
    "Low" real)""".format(tablename)
    conn.execute(sql)
    
    kbars_ori = pd.read_sql("SELECT * FROM {}".format(tablename), conn, parse_dates=["ts"], index_col=["ts"])

    if kbars_ori.empty:  # 資料庫中無任何資料
        start = start_date
        end = today
        print(f'資料庫中無任何資料: 目標{start} - {end}')
        if start <= end:
            kbars = get_kbar(api, contractObj, start=str(start), end=str(end))
            kbars_ori = pd.concat([kbars_ori, kbars])
            print(f'取得: {start} - {end}')
        
    else:
        first_date = kbars_ori.index.min().strftime("%Y-%m-%d") #大Y
        last_date = kbars_ori.index.max().strftime("%Y-%m-%d")
        print(f'資料庫中日期: {first_date} - {last_date}')
        
        if datetime.strptime(first_date, "%Y-%m-%d").date() <= start_date:  # 資料庫中最早的日期不晚於開始日期
            start = datetime.strptime(last_date, "%Y-%m-%d").date() +  timedelta(1)
            end = today
            print(f'最早的日期不晚於開始日期: {start} - {end}')
            if start <= end:
                kbars = get_kbar(api, contractObj, start=str(start), end=str(end))
                kbars_ori = pd.concat([kbars_ori, kbars])
                print(f'取得: {start} - {end}')
                
        else:  # 資料庫中最早的日期晚於開始日期
            # 兩段日期區間，一段是開始日期至最早的日期前一天，一段是最後的日期後一天至今天
            start1 = start_date
            end1 = datetime.strptime(first_date, "%Y-%m-%d").date() -  timedelta(1)
            start2 = datetime.strptime(last_date, "%Y-%m-%d").date() + timedelta(1)
            
            end2 = today
            print(f'開始日期早於已有資料: {start1} - {end1} | {start2} - {end2}')

            if start1 <= end1:
                kbars1 = get_kbar(api, contractObj, start=str(start1), end=str(end1))
                kbars_ori = pd.concat([kbars_ori, kbars1])
                print(f'取得1: {start1} - {end1} ')

            if start2 <= end2:
                kbars2 = get_kbar(api, contractObj, start=str(start2), end=str(end2))
                kbars_ori = pd.concat([kbars_ori, kbars2])
                print(f'取得2: {start2} - {end2}')

    final_kbars = (
        kbars_ori.reset_index()
        .drop_duplicates(subset=["ts"], keep="last")
        .set_index(["ts"])
        .sort_index()
    )
    final_kbars.to_sql(tablename, conn, if_exists="replace")
    conn.close()


#### 主程式

In [None]:
from datetime import datetime

#主程式 addstrategy
if __name__ == '__main__':

    start_date = datetime.strptime('2023-07-01', '%Y-%m-%d').date()

    # 更新期貨
    future_list = ["TXFR1","MXFR1"] #台指近月    

    # kbars
    for future in future_list:
        print(f'== {future} start ==')
        contract = api.Contracts.Futures[future]
     
        backFillKbars(api=api,contractObj=contract,contractName=future, start_date=start_date) 
        print(f'== {future} end ==')

    print(f'== Process end ==')

### 工具：檢查kbar資料庫的日期

In [None]:
 # 如果没有提供开始日期，就默认为两年前
from datetime import date
from datetime import timedelta
import sqlite3
import pandas as pd
from time import sleep
import datetime   # 新增這行

def checkTsRange(dbname, tablename):
    conn = sqlite3.connect(dbname)
    df = pd.read_sql('SELECT * FROM "{}"'.format(tablename), conn, parse_dates=["ts"])
    conn.close()

    if df.empty:
        # 如果表为空，返回None
        return None, None

    first_date = df["ts"].min().strftime("%Y-%m-%d")
    last_date = df["ts"].max().strftime("%Y-%m-%d")
    
    return first_date, last_date


# 假设one_month_ago是一个预先定义的值，代表从一年前的日期
dbName = input("你的DB路徑加檔名:")
tablename = "MXFR1" # 小台指近月

first_day, last_day = checkTsRange(dbname=dbName, tablename=tablename)
print(f'kbars {tablename}: {first_day} 至 {last_day}')

tablename = "TXFR1" # 小台指近月

first_day, last_day = checkTsRange(dbname=dbName, tablename=tablename)
print(f'kbars {tablename}: {first_day} 至 {last_day}')