In [None]:
# Package imports
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
import csv
import yfinance
import pandas as pd
from datetime import timedelta
from scipy.signal import argrelextrema
from collections import defaultdict
import warnings
import time
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:70% !important; }</style>"))
%config InlineBackend.figure_format = 'retina'
from tqdm import tqdm
matplotlib.rcParams['figure.figsize'] = (8, 5)
%matplotlib inline
%pylab inline

In [None]:
def get_data(symbol, lookback):
    all_data = pd.DataFrame()
    endDate = pd.to_datetime('today')
    startDate = endDate - datetime.timedelta(days = lookback)
    all_data = yfinance.download(symbol , startDate , endDate , interval= '1m')
    all_data.drop(columns=['Volume', 'Adj Close'], inplace=True)
    all_data.index.name = 'timestamp'
    all_data.replace(0, method='bfill', inplace=True)
    return all_data
        
def get_stock_data(stocklist, lookback):
    stock_data = {}
    for stock in tqdm(stocklist, desc='Getting stock data'):
        try:
            stock_data[stock] = get_data(stock, lookback)
        except Exception as e:
            print('Exception {} {}'.format(stock, e))
    return stock_data
def resample(stock_data, rate):
    for stock in stock_data:
        stock_data[stock] = stock_data[stock].resample(rate, closed='right', label='right').agg({'Open': 'first',
                                                                                                 'High': 'max',
                                                                                                 'Low': 'min',
                                                                                                 'Close': 'last'}).dropna()
    return stock_data


In [None]:
def get_max_min(data,smoothing,window_range):
    smooth_prices = data['Close'].rolling(window=smoothing).mean().dropna()
    local_max = argrelextrema(smooth_prices.values, np.greater)[0]
    local_min = argrelextrema(smooth_prices.values, np.less)[0]
    price_local_max_dt = []
    for i in local_max:
        if (i>window_range) and (i<len(data)-window_range):
            price_local_max_dt.append(data.iloc[i-window_range:i+window_range]['Close'].idxmax())
    price_local_min_dt = []
    for i in local_min:
        if (i>window_range) and (i<len(data)-window_range):
            price_local_min_dt.append(data.iloc[i-window_range:i+window_range]['Close'].idxmin())
    maxima = pd.DataFrame(data.loc[price_local_max_dt])
    minima = pd.DataFrame(data.loc[price_local_min_dt])
    max_min = pd.concat([maxima, minima]).sort_index()
    max_min.index.name = 'date'
    max_min = max_min.reset_index()
    max_min = max_min[~max_min.date.duplicated()]
    p = data.reset_index()   
    max_min['day_num'] = p[p['timestamp'].isin(max_min.date)].index.values
    max_min = max_min.set_index('day_num')['Close']
    return max_min

In [None]:
def find_patterns(max_min):
    patterns = defaultdict(list)
    # Window range is 5 units
    for i in range(5, len(max_min)):  
        window = max_min.iloc[i-5:i]
        
        # Pattern must play out in less than n units
        if window.index[-1] - window.index[0] > 100:      
            continue   
            
        a, b, c, d, e = window.iloc[0:5]
        
        #Head & Shoulder
        if a>b and c>a and c>e and c>d and abs(b-d)<=np.mean([b,d])*0.02:
            patterns['HS'].append((window.index[0], window.index[-1]))
                
        # Inverted Head & Shoulder
        #if a<b and c<a and c<e and c<d and e<d and abs(b-d)<=np.mean([b,d])*0.02:
            #patterns['IHS'].append((window.index[0], window.index[-1]))
        
    return patterns

In [None]:
def plot_minmax_patterns(prices, max_min, patterns, stock, window, ema):
    
    incr = str((prices.index[1] - prices.index[0]).seconds/60)
    
    if len(patterns) == 0:
        pass
    else:
        num_pat = len([x for x in patterns.items()][0][1])
        f, axes = plt.subplots(1, 2, figsize=(16, 5))
        axes = axes.flatten()
        prices_ = prices.reset_index()['Close']
        axes[0].plot(prices_)
        axes[0].scatter(max_min.index, max_min, s=100, alpha=.3, color='orange')
        axes[1].plot(prices_)
        for name, end_day_nums in patterns.items():
            for i, tup in enumerate(end_day_nums):
                sd = tup[0]
                ed = tup[1]
                axes[1].scatter(max_min.loc[sd:ed].index,
                              max_min.loc[sd:ed].values,
                              s=200, alpha=.3)
                plt.yticks([])
        plt.tight_layout()
        plt.title('{}: {}: EMA {}, Window {} ({} patterns)'.format(stock, incr, ema, window, num_pat))
def get_results(prices, max_min, pat, stock, ema_, window_):
    
    incr = str((prices.index[1] - prices.index[0]).seconds/60)
    
    #fw_list = [1, 12, 24, 36] 
    fw_list = [1, 2, 3]
    results = []
    if len(pat.items()) > 0:
        end_dates = [v for k, v in pat.items()][0]      
        for date in end_dates:  
            param_res = {'stock': stock,
                         'increment': incr,
                         'ema': ema_,
                         'window': window_, 
                         'date': date}
            for x in fw_list:
                returns = (prices['Close'].pct_change(x).shift(-x).reset_index(drop=True).dropna())
                try:
                    param_res['fw_ret_{}'.format(x)] = returns.loc[date[1]]   
                except Exception as e:
                    param_res['fw_ret_{}'.format(x)] = e
            results.append(param_res)  
    else:
        param_res = {'stock': stock,
                     'increment': incr,
                     'ema': ema_,
                     'window': window_,
                     'date': None}
        for x in fw_list:
            param_res['fw_ret_{}'.format(x)] = None   
        results.append(param_res)
    return pd.DataFrame(results)
def screener(stock_data, ema_list, window_list, plot, results):
    
    all_results = pd.DataFrame()
    
    for stock in stock_data:
        prices = stock_data[stock]
        
        for ema_ in ema_list:
            for window_ in window_list: 
                max_min = get_max_min(prices, smoothing=ema_, window_range=window_)
                pat = find_patterns(max_min)
                
                if plot == True:
                    plot_minmax_patterns(prices, max_min, pat, stock, window_, ema_)
                    
                if results == True:
                    all_results = pd.concat([all_results, get_results(prices, max_min, pat, stock, ema_, window_)], axis=0)
                
    if results == True:
        return all_results.reset_index(drop=True)

In [None]:
stocklist = ['AAPL', 'AMZN', 'FB', 'NFLX', 'GOOG']
stock_data = get_stock_data(stocklist, 5)

In [None]:
ema_list = [3, 5, 7, 10, 20, ]
window_list = [3, 5, 7, 10, 20 ]
resampled_stock_data = stock_data.copy()
resampled_stock_data = resample(resampled_stock_data, '5T')
results = screener(resampled_stock_data, ema_list, window_list, plot=True, results=True)

In [None]:
# Examples of reading the results.

for x in [x for x in results.columns if x not in ['stock', 'date']]:
    results[x] = pd.to_numeric(results[x], errors='coerce')
    
print('Average results by window param')
display(round(results.groupby('window').mean(), 3).sort_values(by=[x for x in results.columns if 'fw' in x], ascending=False))
print('\n', 'Average results by ema param')
display(round(results.groupby('ema').mean(), 3).sort_values(by=[x for x in results.columns if 'fw' in x], ascending=False))
print('\n', 'Average results by stock')
display(round(results.groupby('stock').mean(), 3).sort_values(by=[x for x in results.columns if 'fw' in x], ascending=False))
print('\n', 'Results sorted by return')
display(round(results.sort_values(by=[x for x in results.columns if 'fw' in x], ascending=False), 3))

In [None]:
ema_list = [7]
window_list = [7]
resampled_stock_data = stock_data.copy()
resampled_stock_data = resample(resampled_stock_data, '5T')
results = screener(resampled_stock_data, ema_list, window_list, plot=True, results=True)