In [38]:
import datetime
import requests
import sqlite3
import time

db_name = "db.sqlite"

def execute_db(fname, sql_cmd):
    conn = sqlite3.connect(fname)
    c = conn.cursor()
    c.execute(sql_cmd)
    conn.commit()
    conn.close()
    
def select_db(fname, sql_cmd):
    conn = sqlite3.connect(fname)
    c = conn.cursor()
    c.execute(sql_cmd)
    rows = c.fetchall()
    conn.close()
    return rows
    
    
def str_to_num(s, c_type):
    # 將字串 s 移除逗點與句點後轉為 int/float
    # 若非 float 或 int 則不處理, 直接回傳
    if c_type not in ["int", "float"]:
        return s
    s = s.replace(",", "")
    try:
        if c_type == "int":
            return int(s)
        else:
            return float(s)
    except:  # 轉換失敗則回傳 -1
        return -1


def crawl_price(date):
    # 將 datetime物件字串化為 YYYYMMDD 的格式
    datestr = date.strftime("%Y%m%d")  # %Y會回傳4位數的格式, %m會回傳2位數的格式, %d會回傳2位數的格式

    # 從證交所網站獲取指定日期的所有個股的資訊
    resp = requests.get(
        "https://www.twse.com.tw/exchangeReport/MI_INDEX?response=json&date="
        + datestr + "&type=ALLBUT0999")
    data = resp.json()  # json = JavaScript Object Notation
    if "data9" not in data:  # 當天沒有資料，可能為假日
        return None

    # 欄位定義
    # ["證券代號", "日期(新增)", "成交股數", "成交筆數", "成交金額", "開盤價", 
    # "最高價", "最低價", "收盤價", "漲跌價差", "最後揭示買價", "最後揭示買量", 
    # "最後揭示賣價", "最後揭示賣量", "本益比"]
    else:
        types = ["text", "datetime", "int", "int", "int", "float", "float", "float",
                 "float", "float", "float", "int", "float", "int", "float"]

        prices = []
        for item in data["data9"]:
            if item[2] == "0":  # 當日成交股數為 0 的資料列
                continue
            # 第 2 欄的證券名稱及第 10 欄 (漲跌(+/-), 為 HTML 碼) 不需要,故移除之
            filtered = item[:1] + item[2:9] + item[10:]
            # 插入日期欄位到第 2 欄
            filtered = filtered[:1] + [date.strftime("%Y-%m-%d")] + filtered[1:]
            prices.append([str_to_num(s, types[i]) for i, s in enumerate(filtered)])
    return prices


def bulk_insert(fname, bulk_data):
    conn = sqlite3.connect(fname)
    c = conn.cursor()
    c.execute("BEGIN TRANSACTION")
    for d in bulk_data:
        values = ["'" + str(e) + "'" for e in d]
        cmd = 'INSERT OR REPLACE INTO daily_price VALUES({})'.format(','.join(values))
        c.execute(cmd)
    c.execute("COMMIT")
    conn.close()


def update_db(date_from, date_to):
    print("更新資料: {} 到 {}".format(date_from.strftime("%Y-%m-%d"), date_to.strftime("%Y-%m-%d")))
    current = date_from
    while current <= date_to:
        prices = crawl_price(current)
        if prices:
            bulk_insert(db_name, prices)
            print(current.strftime("%Y-%m-%d"), "...成功")
        else:
            print(current.strftime("%Y-%m-%d"), "...失敗(可能為假日)")
        current += datetime.timedelta(days=1)
        time.sleep(3)  # 放慢爬蟲速度


def get_date_range_from_db(fname):
    conn = sqlite3.connect(fname)
    c = conn.cursor()
    c.execute("select * from daily_price order by 日期 ASC LIMIT 1;")
    date_from = datetime.datetime.strptime(list(c)[0][1], "%Y-%m-%d")
    c.execute("select * from daily_price order by 日期 DESC LIMIT 1;")
    date_to = datetime.datetime.strptime(list(c)[0][1], "%Y-%m-%d")
    conn.close()
    return date_from, date_to

# cmd = 'DROP TABLE daily_price'

# cmd = 'CREATE TABLE "daily_price" (\
# `證券代號` TEXT,\
# `日期` DATE,\
# `成交股數` INTEGER,\
# `成交筆數` INTEGER,\
# `成交金額` INTEGER,\
# `開盤價` REAL,\
# `最高價` REAL,\
# `最低價` REAL,\
# `收盤價` REAL,\
# `漲跌價差` REAL,\
# `最後揭示買價` REAL,\
# `最後揭示買量` INTEGER,\
# `最後揭示賣價` REAL,\
# `最後揭示賣量` INTEGER,\
# `本益比` REAL\
# )'
# execute_db(db_name, cmd)

In [16]:
# 主程式

# bulk_data = crawl_price(datetime.datetime(2019, 1, 23))
# bulk_insert(db_name, bulk_data)
# db_from, db_to = get_date_range_from_db(db_name)
# print("資料庫日期: {} 到 {}".format(db_from.strftime("%Y-%m-%d"), db_to.strftime("%Y-%m-%d")))
# date_from = db_to + datetime.timedelta(days=1)
# date_to = datetime.datetime.today()
# update_db(date_from, date_to)

In [66]:
def get_data(fname, stock_id ,period):
    conn = sqlite3.connect(fname)
    c = conn.cursor()
    cmd = "SELECT 證券代號, 日期, 收盤價 FROM daily_price WHERE 證券代號 == '{:s}' ORDER BY 日期 LIMIT {:d};".format(stock_id, period)
    c.execute(cmd)
    rows = c.fetchall()
    conn.close()
    return rows


In [68]:
def calculate_kvalue_and_dvalue(data):
    dates = [r[1] for r in data]
    prices = [r[2] for r in data] # prices are ordered by date time, index 0 is first day, index -1 is last day
    # 前八天 RSV為 0, 第八天 K值 == D值 == 50
   
    k_value_list = [0] * 7 + [50]
    d_value_list = [0] * 7 + [50]
    for day, price in enumerate(prices):
        if day >= 8: # 第九天開始計算 RSV
            min_price = min(prices[day-8:day+1]) # 九天內的最小收盤價
            max_price = max(prices[day-8:day+1]) # 九天內的最大收盤價
            if day == len(prices) - 1:
                min_price = min(prices[-9:]) # 九天內的最小收盤價
                max_price = max(prices[-9:]) # 九天內的最大收盤價
            
            rsv = 100 * (price - min_price) / (max_price - min_price)

            k_value = rsv * 1/3 + k_value_list[day - 1] * 2/3
            k_value_list.append(k_value)

            d_value = k_value * 1/3 + d_value_list[day - 1] * 2/3
            d_value_list.append(d_value)

            prev_k_value = k_value
            prev_d_value = d_value
        
    return k_value_list, d_value_list
        

def show_buy_signal(k_values, d_values):
    signal_day = [0] * 8
    for day in range(8, len(k_values)):
        if k_values[day - 1] < d_values[day - 1] and k_values[day] > d_values[day] and k_values[day] < 30:
            signal_day.append(1)
        else:
            signal_day.append(0)
    return signal_day

prices_0050 = get_data(db_name, "0050", 240)
dates = [r[1] for r in prices_0050]
prices = [r[2] for r in prices_0050]
print("起始日期: {} (收盤價: {}), 結束日期: {} (收盤價: {}) ({} 天)".format(dates[0], prices[0], dates[-1], prices[-1], len(dates)))

k_values, d_values = calculate_kvalue_and_dvalue(prices_0050)
signal_day = show_buy_signal(k_values, d_values)
print("本金 10 萬元. 期間有 {} 次買進訊號, 一次投入 1 萬元".format(sum(signal_day)))

profit = [10]
ratios = [1] + [prices[i] / prices[i - 1] for i in range(1, len(prices)) ]
signal_day = [0] + signal_day[:-1]
for s, r in zip(signal_day[1:], ratios[1:]):
    profit.append(profit[-1] * r + s)

print("回測結果: {:f}".format(profit[-1]))
interest_ratio = 0.011 * 240/365
total = 10 + sum(signal_day)
print("{} 萬定存結果 (利率 1.1%): {:f}".format(total, total * (1 + interest_ratio)))



start = 0
for t in range(sum(signal_day)):
    index = signal_day.index(1,start)
    start = index + 1
    print("K值大於D值的日期:", date[index])

            

起始日期: 2019-01-23 (收盤價: 73.7), 結束日期: 2020-01-20 (收盤價: 97.7) (240 天)
本金 10 萬元. 期間有 4 次買進訊號, 一次投入 1 萬元
回測結果: 18.224514
14 萬定存結果 (利率 1.1%): 14.101260
K值大於D值的日期: 2019-05-27
K值大於D值的日期: 2019-05-31
K值大於D值的日期: 2019-08-13
K值大於D值的日期: 2019-08-15
