In [None]:
import ffn
%matplotlib inline
import numpy as np
import pandas as pd
import pandas_datareader.data as web
from datetime import datetime
import pickle
import talib
from talib import MA_Type
import requests
import io

In [None]:
# 讀出預先下載好的股價資料
with open('stockdata', 'rb') as f:
    data = pickle.load(file=f)

# Q1. 畫出 Diamond Shape

In [None]:
def diamond(n):
    for i in range(n):
        print(" " *(n-i) + "*" * (2*i+1))
    for j in range(n-2,-1,-1):
        print(" " *(n-j) + "*" * (2*j+1))    

In [None]:
diamond(5)

# Q2. 刪除重複性資料

從 symbols01.txt 跟 symbols02.txt 讀入兩組股票代號，合併兩組代號，輸出一份無重複的股票代號到 symbols03.txt。

In [None]:
with open('symbols01.txt','r') as f1:
    symbols_01=f1.read().strip().split("\n")
    set_01=set(symbols_01)

In [None]:
with open('symbols02.txt','r') as f2:
    symbols_02=f2.read().strip().split("\n")
    set_02=set(symbols_02)

In [None]:
symbols_03 = list(set_01|set_02)

In [None]:
symbols_03

In [None]:
with open('symbols03.txt','w') as f3:
    for i in symbols_03:
        f3.write(i.strip().upper()+"\n")
      

# Q3. 下載台灣上市公司基本資料

寫一個程式從 [http://dts.twse.com.tw/opendata/t187ap03_L.csv](http://dts.twse.com.tw/opendata/t187ap03_L.csv) 下載台灣上市公司基本資料，並輸出成 pandas dataframe。

In [None]:
r = requests.get("http://dts.twse.com.tw/opendata/t187ap03_L.csv")
r.encoding = "big5"
q3 = io.StringIO(r.text)
Q3 = pd.read_csv(q3, skiprows=1)

In [None]:
r = requests.get("http://dts.twse.com.tw/opendata/t187ap03_L.csv")
r.encoding = "big5"

In [None]:
q3 = io.StringIO(r.text)
Q3 = pd.read_csv(q3, skiprows=1)
Q3

# Q4. 畫出技術曲線

請畫出除了上課提過的線型之外的任何技術曲線，並寫註解說明。

In [None]:
df = data['GOOG']

In [None]:
df

In [None]:
# 計算均線
df['5d'] = pd.Series.rolling(df['Close'], window=5).mean()
df['20d'] = pd.Series.rolling(df['Close'], window=20).mean()
df['60d'] = pd.Series.rolling(df['Close'], window=60).mean()

In [None]:
# 計算 RSV(9天)
df['RSV'] = 100*( (df['Close'] - df['Low'].rolling(window=9).min())/
                 (df['High'].rolling(window=9).max() - df['Low'].rolling(window=9).min()) )


In [None]:
#計算 K(9天)
df['K']= np.nan
df['K'][7] = 50
for i in range(8,len(df)):
    K_value = (1/3) * df['RSV'][i] + (2/3) *df['K'][i-1]
    df['K'][i] = K_value

In [None]:
#計算 D(9天)
df['D']= np.nan
df['D'][7] = 50
for i in range(8,len(df)):
    D_value = (1/3) * df['K'][i] + (2/3) *df['D'][i-1]
    df['D'][i] = D_value

In [None]:
#計算 J(9天)
df['J']=3*df['D']-2*df['K']
df

In [None]:
df[['Close','5d','20d','60d']].plot(legend='best', figsize=(10,8))

In [None]:
df[['K','D','J']].plot(legend='best', figsize=(10,8))

# Q5. 策略回測

In [None]:
# 計算 MaxDD
def DrawDownAnalysis(cumRet):
    dd_series = ffn.core.to_drawdown_series(cumRet)
    dd_details = ffn.core.drawdown_details(dd_series)
    return dd_details['drawdown'].min(), dd_details['days'].max()

In [None]:
# 利用策略產生的持有部位資訊，計算底下四個指標來判斷投資績效
# sharpe ratio: 判斷報酬的好壞跟穩定度，數值越大越好
# maxdd: maximum drawdown, 最糟糕的狀況會賠幾 %
# maxddd: maximum drawdown duration, 低於上一次最高報酬的天數
# cumRet[-1]: 最後賺的 % 數

def indicators(df):
    dailyRet = df['Close'].pct_change()
    excessRet = (dailyRet - 0.04/252)[df['positions'] == 1]
    SharpeRatio = np.sqrt(252.0)*np.mean(excessRet)/np.std(excessRet)
    
    cumRet = np.cumprod(1+excessRet)
    
    maxdd, maxddd = DrawDownAnalysis(cumRet)
    
    return SharpeRatio, maxdd, maxddd, cumRet[-1]

In [None]:
# 這是我們的策略的部分
# 主要只是要算出進出的訊號 signals 跟何時持有部位 positions
# 底下是一個突破系統的範例

def Breakout_strategy(df):
    # Donchian Channel
    df['20d_high'] = np.round(pd.Series.rolling(df['Close'], window=20).max(), 2)
    df['10d_low'] = np.round(pd.Series.rolling(df['Close'], window=10).min(), 2)

    has_position = False
    df['signals'] = 0
    for t in range(2, df['signals'].size):
        if df['Close'][t] > df['20d_high'][t-1]:
            if not has_position:
                df.loc[df.index[t], 'signals'] = 1
                has_position = True
        elif df['Close'][t] < df['10d_low'][t-1]:
            if has_position:
                df.loc[df.index[t], 'signals'] = -1
                has_position = False

    df['positions'] = df['signals'].cumsum().shift()
    return df

In [None]:
def RSI_7030_strategy(df):
    df['RSI'] = talib.RSI(df['Close'].values)
    
    has_position = False
    df['signals'] = 0
    for t in range(2, df['signals'].size):
        if df['RSI'][t-1] < 30:
            if not has_position:
                df.loc[df.index[t], 'signals'] = 1
                has_position = True
        elif df['RSI'][t-1] > 70:
            if has_position:
                df.loc[df.index[t], 'signals'] = -1
                has_position = False

    df['positions'] = df['signals'].cumsum().shift()
    return df

In [None]:
def BBands_strategy(df):
    df['UBB'], df['MBB'], df['LBB'] = talib.BBANDS(df['Close'].values, matype=MA_Type.T3)

    has_position = False
    df['signals'] = 0
    for t in range(2, df['signals'].size):
        if df['Close'][t] < df['LBB'][t-1]:
            if not has_position:
                df.loc[df.index[t], 'signals'] = 1
                has_position = True
        elif df['Close'][t] > df['UBB'][t-1]:
            if has_position:
                df.loc[df.index[t], 'signals'] = -1
                has_position = False

    df['positions'] = df['signals'].cumsum().shift()
    return df

In [None]:
# 定義你的策略
# 進場條件 : 當 K > D 且 K, D 皆 < 20  黃金交叉 買進
# 出場條件 : 當 D < K 且 K, D 皆 < 80  死亡交叉 賣出

def M064030046_strategy(df):
    # 計算 RSV(9天)
    df['RSV'] = 100*( (df['Close'] - df['Low'].rolling(window=9).min())/
                     (df['High'].rolling(window=9).max() - df['Low'].rolling(window=9).min()) )

    #計算 K(9天)
    df['K']= np.nan
    df['K'][7] = 50
    for i in range(8,len(df)):
        K_value = (1/3) * df['RSV'][i] + (2/3) *df['K'][i-1]
        df['K'][i] = K_value
    #計算 D(9天)
    df['D']= np.nan
    df['D'][7] = 50
    for i in range(8,len(df)):
        D_value = (1/3) * df['K'][i] + (2/3) *df['D'][i-1]
        df['D'][i] = D_value        
    
    has_position = False
    df['signals'] = 0
    for t in range(2, df['signals'].size):
        if( (df['K'][t] > df['D'][t]) & (df['K'][t]<20) & (df['D'][t]<20) ):
            if not has_position:
                df.loc[df.index[t], 'signals'] = 1
                has_position = True
        elif ( (df['K'][t] < df['D'][t]) & (df['K'][t]>80) & (df['D'][t]>80) ):
            if has_position:
                df.loc[df.index[t], 'signals'] = -1
                has_position = False    
   
    df['positions'] = df['signals'].cumsum().shift()
    return df

In [None]:
def apply_strategy(strategy, df):
    return strategy(df)

In [None]:
# 計算各支股票的回測結果
results = []

#strategies = [Breakout_strategy, RSI_7030_strategy, BBands_strategy, 學號_strategy]
strategies = [Breakout_strategy, RSI_7030_strategy, BBands_strategy,  M064030046_strategy]

for symbol in data:
    for strategy in strategies:
        try:
            apply_strategy(strategy, data[symbol])
            if np.all(data[symbol]['signals']==0):
                print("Symbol:", symbol, "使用", strategy.__name__, "策略沒有出現買賣訊號。")
                continue
            SharpeRatio, maxdd, maxddd, finalRet = indicators(data[symbol])
            days = (data[symbol].index[-1] - data[symbol].index[0]).days
            results.append((SharpeRatio, maxdd, maxddd, finalRet, days,
                            data[symbol][data[symbol]['signals'] > 0]['signals'].sum(), symbol, strategy.__name__))
        except Exception as e:
            print("Error occurs at symbol:", symbol, "Strategy:", strategy.__name__, "==>", e.args)

In [None]:
results_df = pd.DataFrame(results, columns=['sharpe','MaxDrawDown','MaxDrawDownDuration','returns', 
                                            'days', 'entries','symbol','strategy'])

In [None]:
results_df.head()

In [None]:
results_df.sort_values('MaxDrawDown',ascending=False).head()

In [None]:
results_df.sort_values('returns',ascending=False).head()