## Requirements

In [4]:
import requests
import pandas as pd
import datetime
from datetime import timedelta
import pytz
import numpy as np
import matplotlib.pyplot as plt 
import os

api_key = os.environ.get('EOD_Historical_Data_API_Key')

## ETL Functions

In [2]:
def get_daily_prices(symbol: str, start_date: str, end_date: str = None, adjusted: bool = True):

    # Set last date of dataset to today unless assigned during function call
    if end_date is None:
        end_date = datetime.datetime.now().strftime("%Y-%m-%d")

    # Validate Start & End Date 
    # Convert strings to datetime objects
    start_date_dtObj = datetime.datetime.strptime(start_date, "%Y-%m-%d")
    end_date_dtObj = datetime.datetime.strptime(end_date, "%Y-%m-%d")
    # Validate
    start_date_dtObj = min(start_date_dtObj, end_date_dtObj)
    end_date_dtObj = max(start_date_dtObj, end_date_dtObj)

    #OHLCV
    # API Request to return Daily OHLCV in Pandas DataFrame
    api_url = f'https://eodhistoricaldata.com/api/eod/{symbol}.US?from={start_date_dtObj}&to={end_date_dtObj}&api_token={api_key}&fmt=json'
    data = requests.get(api_url).json()
    df = pd.DataFrame(data)
    # Add column to DataFrame containing Symbol
    df.insert(loc=0, column='symbol', value=symbol)
    # Format date column as datetime, and set as DataFrame index
    df['date'] = pd.to_datetime(df['date'])
    df = df.set_index("date", drop=False)

    if adjusted:
        # Splits
        # API Request to return historical Splits in Pandas DataFrame
        api_url = f'https://eodhistoricaldata.com/api/splits/{symbol}.US?api_token={api_key}&from={start_date_dtObj}&fmt=json'
        data = requests.get(api_url).json()
        splits = pd.DataFrame(data)
        # Check if the DataFrame is emtpty 
        # If the DataFrame is empty, create a placeholder DF that will be used to hold split factor of 1
        if len(splits.index) == 0:
            splits = pd.DataFrame(data={'split': 1}, index=[start_date_dtObj])
        # If the DataFrame is not empty, format the DataFrame to hold split factors
        else:
        # Format Splits for function adjustedPrice 
            splits['split'] = splits.eval(splits['split'])
            splits['split'] = 1 / splits['split']
            # Format date column as datetime, and set as DataFrame index
            splits['date'] = pd.to_datetime(splits['date'])
            splits.set_index("date", drop=True, inplace=True)

        # Dividends
        # API Request to return historical Dividends in Pandas DataFrame
        api_url = f'https://eodhistoricaldata.com/api/div/{symbol}.US?api_token={api_key}&from={start_date_dtObj}&fmt=json'
        data = requests.get(api_url).json()
        dividends = pd.DataFrame(data)
        # Check if the DataFrame is emtpty 
        # If the DataFrame is empty, create a placeholder DF that will be used to hold dividend amount of 0
        if len(dividends.index) == 0:
            dividends = pd.DataFrame(data={'unadjustedValue': 0}, index=[start_date_dtObj])
        # If the DataFrame is not empty, format the DataFrame to hold dividend amounts
        else:
        # Drop unnecessary columns
            dividends_keep_col = ['date', 'unadjustedValue']
            dividends = dividends[dividends_keep_col]
            # Format date column as datetime, and set as DataFrame index
            dividends['date'] = pd.to_datetime(dividends['date'])
            dividends.set_index("date", drop=True, inplace=True)

        # Merge Splits & Dividends to OHLCV DataFrame
        df = df.join(splits).fillna(1)  # Fill NaN values with a default split factor of 1.
        df = df.join(dividends).fillna(0)  # Fill NaN values with a default dividend amount of 0.

        # Drop existing adjusted_close
        df.drop(['adjusted_close'], axis=1, inplace=True)

        # Add columns for Adjusted OHLC
        adjusted_daily_price(df, 'open')
        adjusted_daily_price(df, 'high')
        adjusted_daily_price(df, 'low')
        adjusted_daily_price(df, 'close')

        # Drop split and dividends columns
        remove_col = ['split', 'unadjustedValue']
        df.drop(remove_col, axis=1, inplace=True)

    return df

In [1]:
def get_intraday_prices(symbol: str, start_date: str, end_date: str = None, interval: int = 2, adjusted: bool = True):

    # Define API max period in days
    API_MAX_PERIOD = 120
    # Define number of minutes starting from 00:00 up to the following day
    EOD_MINUTES = 60 * 24 - 1
    # Define number of minutes in trading day (normal market hours)
    TRADING_DAY_MINUTES = int(60 * 6.5)

    # Set last date of dataset to today unless assigned during function call
    if end_date is None:
        end_date = datetime.datetime.now().strftime("%Y-%m-%d")

    # Validate Start & End Date 
    # Convert strings to datetime objects
    start_date_dtObj = datetime.datetime.strptime(start_date, "%Y-%m-%d")
    end_date_dtObj = datetime.datetime.strptime(end_date, "%Y-%m-%d")
    # Validate
    start_date_dtObj = min(start_date_dtObj, end_date_dtObj)
    end_date_dtObj = max(start_date_dtObj, end_date_dtObj)

    # Calculate n-number of Date Ranges with a maximum 120-day period
    # Create a date table containing all of the dates between the start and end dates defined during function call
    date_table = pd.date_range(start=start_date, end=end_date, tz='US/Eastern').to_frame(index=False, name='date')
    # Add a column to the date table that will be used to group date ranges with a maximum of 120 days bewteen the first and last date
    date_table['date_range'] = -(-(date_table.index.to_numpy() + 1) // API_MAX_PERIOD)  # Upside-down floor division to convert floor to ceiling with negation. 
    # Create a list of all of the 120-day date ranges that make up the duration defined during function call
    date_ranges = date_table.groupby('date_range')['date'].apply(list)
    # Define number of iterations for For Loop
    number_of_date_ranges = np.max(date_table['date_range'])
    # Convert the Date Ranges to only two values: the start and end datetime values, converted to Unix UTC, and perform API call
    # Create an empty list to hold the DataFrame results of each date range.
    dfs = []
    for date_range in range(1, number_of_date_ranges + 1):  # For Loop for n-number of Date Ranges
        date_range_start_date = np.min(date_ranges[date_range])  # Start of Date Range
        date_range_end_date = np.max(date_ranges[date_range]) + timedelta(minutes=EOD_MINUTES)  # End of Date Range
        utc_tz = pytz.timezone('UTC')  # Define UTC timezone
        date_range_start_date_utc = utc_tz.normalize(date_range_start_date)  # Convert date range start date to UTC timezone
        date_range_end_date_utc = utc_tz.normalize(date_range_end_date)  # Convert date range end date to UTC timezone
        date_range_start_date_unix_utc = int(date_range_start_date_utc.timestamp())  # Convert date range start date to Unix UTC
        date_range_end_date_unix_utc = int(date_range_end_date_utc.timestamp())  # Convert date range end date to Unix UTC
        # API Request to return Pandas DataFrame
        api_url = f'https://eodhistoricaldata.com/api/intraday/{symbol}.US?api_token={api_key}&interval=1m&fmt=json&from={date_range_start_date_unix_utc}&to={date_range_end_date_unix_utc}'
        data = requests.get(api_url).json()
        df = pd.DataFrame(data)
        dfs.append(df)  # Append DataFrames
    # Return a single DataFrame conatining all Date Ranges
    df = pd.concat(dfs) 

    # Sort and reset index to verify chronological order
    df.sort_values(by='timestamp', ascending=True, inplace=True)
    df.reset_index(drop=True, inplace=True)

    # Add column: convert Unix UTC timestamp to Eastern timezone
    unix_utc = pd.to_datetime(df['timestamp'], unit='s') 
    df['datetime_est'] = unix_utc.dt.tz_localize('UTC')  # UTC timezone aware
    df['datetime_est'] = df['datetime_est'].dt.tz_convert('US/Eastern')  # Convert to Eastern timezone
    df['datetime_est'] = df['datetime_est'].dt.tz_localize(None)  # Return timezone-unaware datetime
    
    # Add column to DataFrame, in 1st position, containing truncated Date (EST)
    df.insert(loc=0, column='date_est', value=df['datetime_est'].dt.date)

    # Set the daily close price to the open of the next row (16:00 est) 
    # ***Intraday Closing Data returned by EOD Histroical Data does not match Daily Closing Data. The Open of the following row (16:00 est) matches***
    close_mask = df['datetime_est'].dt.time == datetime.time(15, 59, 0)
    df['close'] = df['open'].shift(-1).where(close_mask, other=df['close'])

    # Calculate 'Trading Session' for all data points (vectorized)
    # Trading Hours
    market_start = datetime.time(9, 30, 0)
    market_end = datetime.time(16, 0, 0) 
    # Conditions
    conditions = [
                    df['datetime_est'].dt.time < market_start,
                    df['datetime_est'].dt.time >= market_end
                ]
    # Trading Sessions
    values = ['Pre-Market', 'After Horus']
    # Add column for Trading Session
    df['trading_session'] = np.select(conditions, values, default='Market')
   
    # Filter for Market hours
    df = df[df['trading_session'] == 'Market']

    # Define Interval Groups
    position = df.columns.get_loc('datetime_est')
    elapsed = df.iloc[0:, position] - df.iat[0, position]
    minutes_elapsed = (elapsed.dt.seconds/60) + 1
    df['interval_group'] = -(-minutes_elapsed // (TRADING_DAY_MINUTES/interval))  # Upside-down floor division to convert floor to ceiling with negation. 

    # Aggregate Data using Interval Groups
    agg_dict = {
        'open': 'first',
        'high': np.max,
        'low': np.min,
        'close': 'last',
        'volume': np.sum
    }
    df = df.groupby(['date_est', 'interval_group']).agg(agg_dict).reset_index()

    # Add column to DataFrame, in 1st position, containing Symbol
    df.insert(loc=0, column='symbol', value=symbol)

    # Set Index
    df.set_index('date_est', inplace=True)

    if adjusted:
        # Calculate the Adusted OHLC as a percentage on the daily timeframe
        adj_df = get_daily_prices(symbol, start_date, end_date, adjusted=True)
        adjusted_percent(adj_df, 'open')
        adjusted_percent(adj_df, 'high')
        adjusted_percent(adj_df, 'low')
        adjusted_percent(adj_df, 'close')
        # Drop unnecessary columns so the DataFrame only contains percentages
        adj_df = adj_df.loc[:,adj_df.columns.str.endswith('_percent')]
        # Merge Adjusted Percentages to intraday OHLCV DataFrame
        df = df.join(adj_df)

        # Calculate Adjusted Prices
        intraday_adjusted_prices(df, 'open')
        intraday_adjusted_prices(df, 'high')
        intraday_adjusted_prices(df, 'low')
        intraday_adjusted_prices(df, 'close')

    return df

In [None]:
def adjusted_price(df, column):

    """
    https://joshschertz.com/2016/08/27/Vectorizing-Adjusted-Close-with-Python/ 
    Vectorized approach for calculating the adjusted prices for the
    specified column in the provided DataFrame. This creates a new column
    called 'adj_<column name>' with the adjusted prices. This function requires
    that the DataFrame have columns with dividend and split_ratio values.

    :param df: DataFrame with raw prices along with dividend and split_ratio values
    :param column: String of which price column should have adjusted prices created for it
    :return: DataFrame with the addition of the adjusted price column
    """
    
    adj_column = 'adjusted_' + column

    # Reverse the DataFrame order, sorting by date in descending order
    df.sort_index(ascending=False, inplace=True)

    price_col = df[column].to_numpy()
    split_col = df['split'].to_numpy()
    dividend_col = df['unadjustedValue'].to_numpy()
    adj_price_col = np.zeros(len(df.index))
    adj_price_col[0] = price_col[0]

    for i in range(1, len(price_col)):
        adj_price_col[i] = round((adj_price_col[i-1] + adj_price_col[i-1] * (((price_col[i] * split_col[i-1]) - price_col[i-1] - dividend_col[i-1]) / price_col[i-1])), 4)

    df[adj_column] = adj_price_col

    # Change the DataFrame order back to dates ascending
    df.sort_index(ascending=True, inplace=True)

    return df

In [None]:
def adjusted_percent(df, column):

    # Data Validation
    if column.startswith('adjusted_'):
        adj_column = column
        column = column[column.find("_")+1:].split()[0]
    else:
        adj_column = 'adjusted_' + column
        column = column

    price_col = df[column].to_numpy()
    adj_price_col = df[adj_column].to_numpy()
    adj_percent = np.zeros(len(df.index))

    for i in range(0, len(price_col)):
        adj_percent[i] = adj_price_col[i] / price_col[i]

    df['adjusted_' + column + '_percent'] = adj_percent

    return df

def intraday_adjusted_prices(df, column):

    adj_column = 'adjusted_' + column
    adj_percent_column = 'adjusted_' + column + '_percent'

    price_col = df[column].to_numpy()
    adj_percent = df[adj_percent_column].to_numpy()
    adj_price = np.zeros(len(df.index))

    for i in range(0, len(price_col)):
        adj_price[i] = round(adj_percent[i] * price_col[i],4)

    df.drop([adj_percent_column], axis=1, inplace=True)

    df[adj_column] = adj_price

    return df

## Technical Indicators

In [None]:
def sma(df, column: str, period: int):

    # Calculate SMA
    sma = df[column].rolling(window=period, min_periods=period).mean()

    # Add SMA column to DataFrame
    col_header = column + "_sma_" + str(period)
    df[col_header] = sma

    return sma

In [None]:
def ema(df, column: str, period: int):

    # Calculate EMA
    ema = df[column].ewm(span=period, adjust=False, min_periods=period).mean()

    # Add EMA column to DataFrame
    col_header = column + "_ema_" + str(period)
    df[col_header] = ema

    return ema

### Arnaud Legoux Moving Average
Offset: You can set the offset in decimals between the level of 0 and 1. A setting of 0.99 makes the ALMA extremely responsive, while a value of 0.01 makes it very smooth.

Sigma: A setting of 6 makes the filter rather large while a smaller sigma setting makes it more focused. According to Mr. Legoux, a sigma value of 6 is said to offer good performance.

In [1]:
def alma(df, column: str, period: int, offset: float = 0.85, sigma: float = 6.0):

    # Convert column to Numpy Array
    data = df[column].to_numpy()

    # ALMA inputs
    m = np.floor(offset * (period-1))
    s = period / sigma 
    alma = np.zeros(data.shape)
    w_sum = np.zeros(data.shape)

    # Calculate ALMA for each row
    for i in range(len(data)):
        if i < period-1:
            alma[i] = np.nan
        else:
            for j in range(period):
                w = np.exp(-(j-m)*(j-m)/(2*s*s))
                alma[i] += data[i + 1 - period + j] * w
                w_sum[i] += w
            alma[i] = alma[i] / w_sum[i]
 
    # Add ALMA column to DataFrame
    col_header = column + "_alma_" + str(period)
    df[col_header] = alma
    
    return alma

### Bollinger Bands
Short Term: 10 day MA, 1.5 std dev

Medium Term (most commonly used): 20 day MA, 2 std dev

Long Term: 50 day MA, 2.5 std dev

In [None]:
def bollingerBands(df, column: str, ma_type: str = 'alma', period: int = 20, std: int = 2, plot: bool = True):

    data = df[column]
    ma_type = ma_type.lower()

    # Calculate moving average
    if ma_type == 'alma':
        # Use Arnaud Legoux moving average
        ma = alma(df, column, period)
    elif ma_type == 'ema':
        # Use exponential moving average
        ma = ema(df, column, period)
    else :
        # Use simple moving average
        ma = sma(df, column, period)

    # Calculate rolling Standard Deviation
    rstd = data.rolling(window=period, min_periods=period).std()

    # Moving Average column header
    col_header = column + '_' + ma_type + '_' + str(period)

    # Add columns to DataFrame with Bollinger Bands moving average & period in header
    df['bb_upper_' + col_header] = ma + (rstd * std)
    df['bb_lower_' + col_header] = ma - (rstd * std)

    # Show Visual
    if plot:
        plt.style.use('fivethirtyeight')
        plt.rcParams['figure.figsize'] = (20, 10)
        #Determine color of price line depending on dataset trend; green for positive growth, red for negative growth.
        if df.iloc[0][column] > df.iloc[-1][column]: 
            data.plot(label=column, color='#E45756', linewidth=3)
        else:
            data.plot(label=column, color='#54A24B', linewidth=2)
        df['bb_upper_' + col_header].plot(label='bb_upper_' + col_header, linestyle='--', linewidth=1, color='black')
        df[col_header].plot(label=col_header, linestyle='--', linewidth=1.2, color='grey')
        df['bb_lower_' + col_header].plot(label='bb_lower_' + col_header, linestyle='--', linewidth=1, color='black')
        plt.legend(loc='upper left')
        plt.title(df['symbol'].iloc[0].upper())
        plt.show()

    return df


In [None]:
def keltnerChannels(df, column: str, ma_type: str = 'alma', kc_period: int = 20, kc_multiplier: int = 2, atr_period: int = 10, plot: bool = True):

    data = df[column]
    ma_type = ma_type.lower()

    # Calculate moving average
    if ma_type == 'alma':
        # Use Arnaud Legoux moving average
        ma = alma(df, column, kc_period)
    elif ma_type == 'ema':
        # Use exponential moving average
        ma = ema(df, column, kc_period)
    else :
        # Use simple moving average
        ma = sma(df, column, kc_period)

    # Calculate ATR
    kc_atr = atr(df, adjusted=True, ma_type=ma_type, period=atr_period)

    # Moving Average column header
    col_header = column + '_' + ma_type + '_' + str(kc_period)

    # Add columns to DataFrame with Bollinger Bands moving average & period in header
    df['kc_upper_' + col_header] = ma + (kc_atr * kc_multiplier)
    df['kc_lower_' + col_header] = ma - (kc_atr * kc_multiplier)

    # Show Visual
    if plot:
        plt.style.use('fivethirtyeight')
        plt.rcParams['figure.figsize'] = (20, 10)
        #Determine color of price line depending on dataset trend; green for positive growth, red for negative growth.
        if df.iloc[0][column] > df.iloc[-1][column]: 
            data.plot(label=column, color='#E45756', linewidth=3)
        else:
            data.plot(label=column, color='#54A24B', linewidth=2)
        df['kc_upper_' + col_header].plot(label='kc_upper_' + col_header, linestyle='--', linewidth=1, color='black')
        df[col_header].plot(label=col_header, linestyle='--', linewidth=1.2, color='grey')
        df['kc_lower_' + col_header].plot(label='kc_lower_' + col_header, linestyle='--', linewidth=1, color='black')
        plt.legend(loc='upper left')
        plt.title(df['symbol'].iloc[0].upper())
        plt.show()

    return df

In [None]:
def rsi(df, column: str, period: int = 14, ma_type: str = 'alma'):

    delta = df[[column]].diff()
    ma_type = ma_type.lower()

    # Make two series: one for lower closes and one for higher closes
    up = delta.clip(lower=0)
    down = -1 * delta.clip(upper=0)
    
    # Calculate moving average
    if ma_type == 'alma':
        # Use Arnaud Legoux moving average
        ma_up = alma(up, column, period)
        ma_down = alma(down, column, period)
    elif ma_type == 'ema':
        # Use exponential moving average
        ma_up = ema(up, column, period)
        ma_down = ema(down, column, period)
    else :
        # Use simple moving average
        ma_up = sma(up, column, period)
        ma_down = sma(down, column, period)

    # Calculate RSI
    rs = ma_up / ma_down
    rsi = 100 - (100/(1 + rs))

    # Moving Average column header
    col_header = column + '_' + ma_type + '_' + str(period)

    # Add column to DataFrame with RSI moving average & period in header
    df['rsi_' + col_header] = rsi

    return df

In [None]:
def atr(df, adjusted: bool = True, ma_type: str = 'alma', period: int = 10):

    ma_type = ma_type.lower()

    # Assign HLC variables to DataFrame columns
    if adjusted:
        high = df['adjusted_high']
        low = df['adjusted_low']
        close = df['adjusted_close']
    else:
        high = df['high']
        low = df['low']
        close = df['close']
        
    # Calculate True Ranges
    tr1 = high - low
    tr2 = np.abs(high - close.shift())
    tr3 = np.abs(low - close.shift())

    # Find Max of True Ranges
    df['tr'] = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)

    # Calculate ATR
    if ma_type == 'alma':
        # Use Arnaud Legoux moving average
        atr = alma(df, 'tr', period)
    elif ma_type == 'ema':
        # Use exponential moving average
        atr = ema(df, 'tr', period)
    else :
        # Use simple moving average
        atr = sma(df, 'tr', period)

    # Drop True Range column
    df.drop(columns='tr', inplace=True)
 
    return atr

## Misc.

In [8]:
def sp500():

    # Read table from Wiki
    sp500 = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]

    # Sort
    sp500 = sp500.sort_values(by='Symbol')

    # Convert to list
    sp500 = sp500.Symbol.to_list()

    # Convert Non-Class A Stocks to readable format
    sp500 = [i.replace('.','-') for i in sp500]

    return sp500