### 3.1 Calculate Simple Moving Average

In [1]:
import pandas as pd

In [2]:
def calculate_simple_moving_average(series: pd.Series, n: int=20) -> pd.Series:
    """Calculates the simple moving average"""
    return series.rolling(n).mean()

In [3]:
def calculate_simple_moving_sample_stdev(series: pd.Series, n: int=20) -> pd.Series:
    """Calculates the simple moving average standard deviation"""
    return series.rolling(n).std()

### 3.2 Slow Simple Moving Average

In [4]:
import numpy as np
from typing import List

In [5]:
def slow_moving_average(values: List[float], m: int=20):
    """
    This is O(nm) time, because it re-computes the sum at every step
    1 + 2 + 3 + 4 + ... / m
    2 + 3 + 4 + 5 + ... / m
    3 + 4 + 5 + 6 + ... / m
    4 + 5 + 6 + 7 + ... / m
    and so on ...
    Leading to approx (m-1) * n individual additions.
    """

    # Initial values
    moving_average = [None] * (m-1)

    for i in range(m-1, len(values)):
        the_average = np.mean(values[(i-m+1):i+1])
        moving_average.append(the_average)

    return moving_average

### 3.3 Fast Simple Moving Average

In [6]:
def fast_moving_average(values: List[float], m: int=20):
    """
    This is O(n) time, because it keeps track of the intermediate sum.
    Leading to approx 2n individual additions.
    """

    # Initial values
    moving_average = [None] * (m-1)
    accumulator = sum(values[:m])
    moving_average.append(accumulator / m)

    for i in range(m, len(values)):
        accumulator -= values[i-m]
        accumulator += values[i]
        moving_average.append(accumulator / m)

    return moving_average

### 3.4 Calculating MACD

In [7]:
def calculate_macd_oscillator(series: pd.Series,
    n1: int=5, n2: int=34) -> pd.Series:
    """
    Calculate the moving average convergence divergence oscillator, given a 
    short moving average of length n1 and a long moving average of length n2
    """
    assert n1 < n2, f'n1 must be less than n2'
    return calculate_simple_moving_average(series, n1) - \
        calculate_simple_moving_average(series, n2)

### 3.5 Calculate Bollinger Bands

In [8]:
def calculate_bollinger_bands(series: pd.Series, n: int=20) -> pd.DataFrame:
    """
    Calculates the bollinger bands and returns them as a dataframe
    """

    sma = calculate_simple_moving_average(series, n)
    stdev = calculate_simple_moving_sample_stdev(series, n)

    return pd.DataFrame({
        'middle': sma,
        'upper': sma + 2 * stdev,
        'lower': sma - 2 * stdev
    })

### 3.6 Calculate Chaikin Money Flow

$$
Chaikin\;Money\;Flow = \frac{(Close  -  Low) - (High - Close)}{High - Low} × Volume\;for\;the\;Period
$$

In [9]:
def calculate_money_flow_volume_series(df: pd.DataFrame) -> pd.Series:
    """
    Calculates money flow series
    """
    mfv = df['volume'] * (2*df['close'] - df['high'] - df['low']) / \
                                    (df['high'] - df['low'])
    return mfv

In [10]:
def calculate_money_flow_volume(df: pd.DataFrame, n: int=20) -> pd.Series:
    """
    Calculates money flow volume, or q_t in our formula
    """
    return calculate_money_flow_volume_series(df).rolling(n).sum()

In [11]:
def calculate_chaikin_money_flow(df: pd.DataFrame, n: int=20) -> pd.Series:
    """
    Calculates the Chaikin money flow
    """
    return calculate_money_flow_volume(df, n) / df['volume'].rolling(n).sum()

### 3.7 Example Signals

In [12]:
def create_macd_signal(series: pd.Series, n1: int=5, n2: int=34) -> pd.Series:
    """
    Create a momentum-based signal based on the MACD crossover principle. 
    Generate a buy signal when the MACD cross above zero, and a sell signal when
    it crosses below zero.
    """

    # Calculate the macd and get the signs of the values.
    macd = calculate_macd_oscillator(series, n1, n2)
    macd_sign = np.sign(macd)

    # Create a copy shifted by some amount.
    macd_shifted_sign = macd_sign.shift(1, axis=0)

    # Multiply by the sign by the boolean. This will have the effect of casting
    # the boolean to an integer (either 0 or 1) and then multiply by the sign
    # (either -1, 0 or 1).
    return macd_sign * (macd_sign != macd_shifted_sign)

In [13]:
def create_bollinger_band_signal(series: pd.Series, n: int=20) -> pd.Series:
    """
    Create a reversal-based signal based on the upper and lower bands of the 
    Bollinger bands. Generate a buy signal when the price is below the lower 
    band, and a sell signal when the price is above the upper band.
    """
    bollinger_bands = calculate_bollinger_bands(series, n)
    sell = series > bollinger_bands['upper']
    buy = series < bollinger_bands['lower']
    return (1*buy - 1*sell)