In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pymysql
import csv

In [2]:
# Functions

# reverse the data of argument
def reverseData(data):
    data = data[::-1]
    return data

def getRound(data, decimals = 3):
    return np.around(data, decimals)

## given two curve a, b, then return the inversection point of two curves
def FindTheBuyPoint(a, b, mode):
    if(mode == "rsi"):
        buyPoint = np.argwhere(np.diff(np.sign(a - 80)) > 0).reshape(-1) + 0
    else:
        buyPoint = np.argwhere(np.diff(np.sign(a - b)) > 0).reshape(-1) + 0
    return buyPoint

def FindTheSellPoint(a, b, mode):
    if(mode == "rsi"):
        sellPoint = np.argwhere(np.diff(np.sign(a - 20)) < 0).reshape(-1) + 0
    else:
        sellPoint = np.argwhere(np.diff(np.sign(a - b)) < 0).reshape(-1) + 0
    return sellPoint

def GetDataFromDB(ip, userName, psw, tableName, sql):
    try:
        db = pymysql.connect(ip, userName, psw, tableName)
    except:
        print ("Error: unable to connect to DB")
        
    #create a cursor
    cursor = db.cursor()
    results = []
    try:
        cursor.execute(sql)
        results = cursor.fetchall()
    except:
        print ("Error: unable to fetch data from DB")
    
    return results
    db.close()

def SMA(data, period):
    if (type(data) == pd.core.series.Series):
        return data.rolling(window = period).mean()
    else:
        print ("Error: Wrong input, SMA(pandas.core.series.Series, integer)")
        
def EMA(data, period):
    if (type(data) == pd.core.series.Series):
        return data.ewm(span = period, adjust = False).mean()
    else:
        print ("Error: Wrong input, EMA(pandas.core.series.Series, integer)")
        
def RSI(data, period):
    if (type(data) == pd.core.series.Series):
        delta = data.diff()
        up, down = delta.copy(), delta.copy()

        up[up < 0] = 0
        down[down > 0] = 0
        down = abs(down)
        
        Up = SMA(up, period)
        Up = Up.fillna(0)
        for i in range(period,len(Up)):
            Up[i] = getRound((Up[i - 1] * (period - 1) + up[i]) / period)

        Down = SMA(down, period)
        Down = Down.fillna(0)
        for i in range(period,len(Down)):
            Down[i] = getRound((Down[i - 1] * (period - 1) + down[i]) / period)


        rsi = 100 * (Up / (Up + Down))
        rsi = rsi.fillna(0)
        return getRound(rsi, 2)
    else:
        print ("Error: Wrong input, RSI(pandas.core.series.Series, integer)")
        
def MACD(data, period = []):
    if (type(data) == pd.core.frame.DataFrame and len(period) == 3):   
        di = (data['High'] + data['Low'] + 2.0 * data['Close']) / 4.0
        ema12 = SMA(di, period[0])
        ema12 = ema12.fillna(0)
        for i in range(period[0] + 1, len(ema12)):
            ema12[i] = (ema12[i - 1] * (period[0] - 1) + di[i] * 2.0) / (period[0] + 1)
    
        ema26 = SMA(di, period[1])
        ema26 = ema26.fillna(0)
        for i in range(period[1] + 1, len(ema26)):
            ema26[i] = (ema26[i - 1] * (period[1] - 1) + di[i] * 2.0) / (period[1] + 1)
    
        dif = ema12 - ema26

        dem = SMA(dif, period[2])
        dem = dem.fillna(0)
        for i in range(period[2] + 1, len(dem)):
            dem[i] = (dem[i - 1] * (period[2] - 1) + dif[i] * 2.0) / (period[2] + 1)
        return {'dif':dif, 'dem':dem}
    else:
        if(type(data) == pd.core.frame.DataFrame):
            print ("Error: Wrong input, MACD(pandas.core.frame.DataFrame, list of integer)")
        elif(len(period) == 3):
            print ("Error: number of content in list do not equal to 3")
            
def KD(data, result = {}):
    close = data['收盤價(元)'].copy()
    for i in range(0,9):
        close[i] = 0

    data['RSV'] = (( data['收盤價(元)'] - data['最低價(元)'].rolling(window = 9).min()) / (data['最高價(元)'].rolling(window = 9).max() - data['最低價(元)'].rolling(window = 9).min()))
    data['RSV'] = data['RSV'].fillna(0)
    if(not result):
        result = {
            'K9':[0],
            'D9' :[0]
        }
    #calculate everyday's KD
    for i in range(1, len(data.index)):
        K9_value = (1.0/3.0) * data['RSV'][i] + (2.0 / 3.0) * result['K9'][i - 1]
        result['K9'].append(getRound(K9_value, 5))
        D9_value = (2.0/3.0) * result['D9'][i - 1] + (1.0 / 3.0) * result['K9'][i]
        result['D9'].append(getRound(D9_value, 5))
        
    return result

In [3]:
# get data
etfPriceData = pd.read_csv(r"C:\Users\vm06v\Dropbox\taetfp.csv", encoding="big5")

In [4]:
# kd
KD_result = pd.DataFrame(KD(etfPriceData))

# kd's long points & short points
KD_long = FindTheBuyPoint(KD_result['K9'], KD_result['D9'], "kd")
KD_short = FindTheSellPoint(KD_result['K9'], KD_result['D9'], "kd")

# create a long/short points list
KD_result = [[0] for i in range(len(KD_result))]

for i in KD_long:
    KD_result[i][0] = 1
for i in KD_short:
    KD_result[i][0] = -1

In [23]:
# rsi
rsi_result = pd.DataFrame(RSI(etfPriceData['收盤價(元)'], 14))
rsi_result = rsi_result.rename(columns={'收盤價(元)': 'rsi'})

# rsi's long points & short points
rsi_long = FindTheBuyPoint(rsi_result['rsi'], [], "rsi")
rsi_short = FindTheSellPoint(rsi_result['rsi'], [], "rsi")

# create a long/short points list
rsi_result = [[0] for i in range(len(rsi_result))]

for i in rsi_long:
    rsi_result[i][0] = 1
for i in rsi_short:
    rsi_result[i][0] = -1

In [24]:
csvHeader = ['RSI']
file = open(r"C:\Users\vm06v\OneDrive\桌面\indicator.csv", "w", newline = '')
cursor = csv.writer(file)
cursor.writerow(csvHeader)
for i in range(len(rsi_result)):
    cursor.writerow(rsi_result[i])