In [1]:
import numpy as np
import pandas as pd
from typing import Union
import django_initializer
import yfinance as yf
import ontrack.ta as ta
import sys

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

In [2]:
def get_stock_price(symbol, period="2y", interval="1d", start_date=None, end_date=None):
  df = yf.download(tickers=symbol, interval=interval, period=period, start=start_date, end=end_date)
  df['Date'] = pd.to_datetime(df.index)
#   df = df.set_index('Date')
#   df['Date'] = df['Date'].apply(mpl_dates.date2num)
  df = df.loc[:,['Date', 'Open', 'High', 'Low', 'Close', "Adj Close", "Volume"]]
  return df

def ema(data: pd.DataFrame, period: int = 14, smoothing: int = 2) -> list:
    """Exponential Moving Average."""
    ema = [sum(data[:period]) / period]
    for price in data[period:]:
        ema.append(
            (price * (smoothing / (1 + period)))
            + ema[-1] * (1 - (smoothing / (1 + period)))
        )
    for i in range(period - 1):
        ema.insert(0, np.nan)
    return ema

def rolling_signal_list(signals: Union[list, pd.Series]) -> list:
    """Returns a list which repeats the previous signal, until a new
    signal is given.
    Parameters
    ----------
    signals : list | pd.Series
        A series of signals. Zero values are treated as 'no signal'.
    Returns
    -------
    list
        A list of rolled signals.
    Examples
    --------
    >>> rolling_signal_list([0,1,0,0,0,-1,0,0,1,0,0])
        [0, 1, 1, 1, 1, -1, -1, -1, 1, 1, 1]
    """
    rolling_signals = [0]
    last_signal = rolling_signals[0]

    for i in range(1, len(signals)):
        if signals[i] != 0:
            last_signal = signals[i]

        rolling_signals.append(last_signal)

    if isinstance(signals, pd.Series):
        rolling_signals = pd.Series(data=rolling_signals, index=signals.index)

    return rolling_signals

def merge_signals(signal_1: list, signal_2: list) -> list:
    """Returns a single signal list which has merged two signal lists.
    Parameters
    ----------
    signal_1 : list
        The first signal list.
    signal_2 : list
        The second signal list.
    Returns
    -------
    merged_signal_list : list
        The merged result of the two inputted signal series.
    Examples
    --------
    >>> s1 = [1,0,0,0,1,0]
    >>> s2 = [0,0,-1,0,0,-1]
    >>> merge_signals(s1, s2)
        [1, 0, -1, 0, 1, -1]
    """
    merged_signal_list = signal_1.copy()
    for i in range(len(signal_1)):
        if signal_2[i] != 0:
            merged_signal_list[i] = signal_2[i]

    return merged_signal_list

def unroll_signal_list(signals: Union[list, pd.Series]) -> np.array:
    """Unrolls a rolled signal list.
    Parameters
    ----------
    signals : Union[list, pd.Series]
        DESCRIPTION.
    Returns
    -------
    unrolled_signals : np.array
        The unrolled signal series.
    See Also
    --------
    This function is the inverse of rolling_signal_list.
    Examples
    --------
    >>> unroll_signal_list([0, 1, 1, 1, 1, -1, -1, -1, 1, 1, 1])
        array([ 0.,  1.,  0.,  0.,  0., -1.,  0.,  0.,  1.,  0.,  0.])
    """
    unrolled_signals = np.zeros(len(signals))
    for i in range(len(signals)):
        if signals[i] != signals[i - 1]:
            unrolled_signals[i] = signals[i]

    if isinstance(signals, pd.Series):
        unrolled_signals = pd.Series(data=unrolled_signals, index=signals.index)

    return unrolled_signals

def candles_between_crosses(crosses: Union[list, pd.Series]) -> Union[list, pd.Series]:
    """Returns a rolling sum of candles since the last cross/signal occurred.
    Parameters
    ----------
    crosses : list | pd.Series
        The list or Series containing crossover signals.
    Returns
    -------
    counts : TYPE
        The rolling count of bars since the last crossover signal.
    See Also
    ---------
    indicators.crossover
    """

    count = 0
    counts = []

    for i in range(len(crosses)):
        if crosses[i] == 0:
            # Change in signal - reset count
            count += 1
        else:
            count = 0

        counts.append(count)

    if isinstance(crosses, pd.Series):
        # Convert to Series
        counts = pd.Series(data=counts, index=crosses.index, name="counts")

    return counts


In [39]:
def find_swings(data: pd.DataFrame, n: int = 2) -> pd.DataFrame:
    """Locates swings in the inputted data using a moving average gradient
    method.
    Parameters
    ----------
    data : pd.DataFrame | pd.Series | list | np.array
        An OHLC dataframe of price, or an array/list/Series of data from an
        indicator (eg. RSI).
    n : int, optional
        The moving average period. The default is 2.
    Returns
    -------
    swing_df : pd.DataFrame
        A dataframe containing the swing levels detected.
    """
    # Prepare data
    if isinstance(data, pd.DataFrame):
        # OHLC data
        hl2 = (data.High.values + data.Low.values) / 2
        swing_data_highs = pd.Series(ema(data.High.fillna(0), n), index=data.index)
        swing_data_lows = pd.Series(ema(data.Low.fillna(0), n), index=data.index)
        low_data = data.Low.values
        high_data = data.High.values

    elif isinstance(data, pd.Series):
        # Pandas series data
        swing_data_highs = pd.Series(ema(data.fillna(0), n), index=data.index)
        swing_data_lows = pd.Series(ema(data.fillna(0), n), index=data.index)
        low_data = data
        high_data = data
        
        data = pd.DataFrame(data)

    else:
        # Find swings in alternative data source
        data = pd.Series(data)

        # Define swing data
        swing_data_highs = pd.Series(ema(data, n), index=data.index)
        swing_data_lows = pd.Series(ema(data, n), index=data.index)
        low_data = data
        high_data = data
        
        data = pd.DataFrame(data)
        
    data = data.copy()
    
    signed_grad_highs = np.sign((swing_data_highs - swing_data_highs.shift(1)).fillna(method="bfill"))
    signed_grad_lows = np.sign((swing_data_lows - swing_data_lows.shift(1)).fillna(method="bfill"))    
    swings_highs = (signed_grad_highs != signed_grad_highs.shift(1).fillna(method="bfill")) * -signed_grad_highs
    swings_lows = (signed_grad_lows != signed_grad_lows.shift(1).fillna(method="bfill")) * -signed_grad_lows
    

    # Calculate swing extrema
    
    lows = []   
    highs = []
    swing_lows = [] 
    swing_highs = []
    trend_lows = []
    trend_highs = []
    
    previous_high = 0
    previous_trend_high = 0
    previous_swing_high = 0
    
    previous_low = 0
    previous_trend_low = 0
    previous_swing_low = 0
    
    for i, swing in enumerate(swings_lows):
        if swing < 0:
            # Down swing, find low price
            low = min(low_data[i - n : i])
            previous_trend_low = 1
            previous_low = low
        elif swing > 0:
            # Up swing, find high price
            low = min(low_data[i - n : i])            
            if previous_low == 0 or previous_low > low:
                previous_trend_low = -1
                previous_low = low
                previous_swing_low = low
        else:
            # Price movement
            if i > n:
                low = min(low_data[i - n : i])
                if previous_low == 0 or previous_low > low:
                    previous_trend_low = -1
                    previous_low = low
                    previous_swing_low = low
            
        if previous_swing_low == 0 or previous_swing_low > previous_low:
            previous_swing_low = previous_low
        
        lows.append(previous_low)
        swing_lows.append(previous_swing_low)
        trend_lows.append(previous_trend_low)
    
    for i, swing in enumerate(swings_highs):
        if swing < 0:
            # Down swing, find low price
            high = max(high_data[i - n : i])
            if previous_high < high:
                previous_trend_high = 1
                previous_high = high
                previous_swing_high = high
        elif swing > 0:
            # Up swing, find high price
            high = max(high_data[i - n : i])
            previous_trend_high = -1
            previous_high = high
        else:
            # Price movement
            if i > n:
                high = max(high_data[i - n : i])
                if previous_high < high:
                    previous_trend_high = 1
                    previous_high = high
                    previous_swing_high = high
            
        if previous_swing_high < previous_high:
            previous_swing_high = previous_high
        
        highs.append(previous_high)
        swing_highs.append(previous_swing_high)
        trend_highs.append(previous_trend_high)
    
    #data["swing_data_lows"] = swing_data_lows
    #data["signed_grad_lows"] = signed_grad_lows
    #data["swings_lows"] = swings_lows
    #data["Trend_lows"] = trend_lows
    data["Last_lows"] = swing_lows
    data["Lows"] = lows
    
    #data["swing_data_highs"] = swing_data_highs
    #data["signed_grad_highs"] = signed_grad_highs
    #data["swings_highs"] = swings_highs
    #data["Trend_highs"] = trend_highs
    data["Last_highs"] = swing_highs    
    data["Highs"] = highs
    return data

def classify_swings(swing_df: pd.DataFrame, tol: int = 0) -> pd.DataFrame:
    """Classifies a dataframe of swings (from find_swings) into higher-highs,
    lower-highs, higher-lows and lower-lows.
    Parameters
    ----------
    swing_df : pd.DataFrame
        The dataframe returned by find_swings.
    tol : int, optional
        The classification tolerance. The default is 0.
    Returns
    -------
    swing_df : pd.DataFrame
        A dataframe containing the classified swings.
    """
    # Create copy of swing dataframe
    swing_df = swing_df.copy()
    new_level_highs = np.where(swing_df.Last_highs != swing_df.Last_highs.shift(), 1, 0)
    candles_since_last_highs = candles_between_crosses(new_level_highs)
    swing_df["CSLS_highs"] = candles_since_last_highs
    
    new_level_lows = np.where(swing_df.Last_lows != swing_df.Last_lows.shift(), 1, 0)
    candles_since_last_lows = candles_between_crosses(new_level_lows)
    swing_df["CSLS_lows"] = candles_since_last_lows

    # Find strong Support and Resistance zones
    supports = (swing_df.CSLS_lows > tol)
    resistances = (swing_df.CSLS_highs > tol)

    # Find higher highs and lower lows
    strong_lows = (
        supports * swing_df["Lows"]
    )  # Returns high values when there is a strong support
    strongs_highs = (
        resistances * swing_df["Highs"]
    )  # Returns high values when there is a strong support

    # Remove duplicates to preserve indexes of new levels
    swing_df["LSL"] = rolling_signal_list(
        strong_lows
    )  # First of new strong lows
    swing_df["LSH"] = rolling_signal_list(
        strongs_highs
    )  # First of new strong highs
    
    conditions = [
        swing_df['Last_highs'] > swing_df['Highs'],
        swing_df['Last_highs'] == swing_df['Highs']]
    choices = [-1, 1]
    high_signal = np.select(conditions, choices)
    high_signal = rolling_signal_list(high_signal)
    
    conditions = [
        swing_df['Last_lows'] < swing_df['Lows'],
        swing_df['Last_lows'] == swing_df['Lows']]
    choices = [1, -1]
    low_signal = np.select(conditions, choices)
    low_signal = rolling_signal_list(low_signal)
    
    swing_df["Trend_signal"] = np.add(low_signal, high_signal) 
    swing_df["Trend_signal"] = swing_df["Trend_signal"].replace(to_replace=0, method="ffill")

    return swing_df

def detect_divergence(
    classified_price_swings: pd.DataFrame,
    classified_indicator_swings: pd.DataFrame,
    tol: int = 2,
    method: int = 0,
) -> pd.DataFrame:
    """Detects divergence between price swings and swings in an indicator.
    Parameters
    ----------
    classified_price_swings : pd.DataFrame
        The output from classify_swings using OHLC data.
    classified_indicator_swings : pd.DataFrame
        The output from classify_swings using indicator data.
    tol : int, optional
        The number of candles which conditions must be met within. The
        default is 2.
    method : int, optional
        The method to use when detecting divergence (0 or 1). The default is 0.
    Raises
    ------
    Exception
        When an unrecognised method of divergence detection is requested.
    Returns
    -------
    divergence : pd.DataFrame
        A dataframe containing divergence signals.
    Notes
    -----
    Options for the method include:
        0: use both price and indicator swings to detect divergence (default)
        1: use only indicator swings to detect divergence (more responsive)
    """
    if method == 0:
        regular_bullish = []
        regular_bearish = []
        hidden_bullish = []
        hidden_bearish = []

        for i in range(len(classified_price_swings)):
            # Look backwards in each

            # REGULAR BULLISH DIVERGENCE
            if (
                sum(classified_price_swings["LL"][i - tol : i])
                + sum(classified_indicator_swings["HL"][i - tol : i])
                > 1
            ):
                regular_bullish.append(True)
            else:
                regular_bullish.append(False)

            # REGULAR BEARISH DIVERGENCE
            if (
                sum(classified_price_swings["HH"][i - tol : i])
                + sum(classified_indicator_swings["LH"][i - tol : i])
                > 1
            ):
                regular_bearish.append(True)
            else:
                regular_bearish.append(False)

            # HIDDEN BULLISH DIVERGENCE
            if (
                sum(classified_price_swings["HL"][i - tol : i])
                + sum(classified_indicator_swings["LL"][i - tol : i])
                > 1
            ):
                hidden_bullish.append(True)
            else:
                hidden_bullish.append(False)

            # HIDDEN BEARISH DIVERGENCE
            if (
                sum(classified_price_swings["LH"][i - tol : i])
                + sum(classified_indicator_swings["HH"][i - tol : i])
                > 1
            ):
                hidden_bearish.append(True)
            else:
                hidden_bearish.append(False)

        divergence = pd.DataFrame(
            data={
                "regularBull": unroll_signal_list(regular_bullish),
                "regularBear": unroll_signal_list(regular_bearish),
                "hiddenBull": unroll_signal_list(hidden_bullish),
                "hiddenBear": unroll_signal_list(hidden_bearish),
            },
            index=classified_price_swings.index,
        )
    elif method == 1:
        # Use indicator swings only to detect divergence
        for i in range(len(classified_price_swings)):

            price_at_indi_lows = (
                classified_indicator_swings["FSL"] != 0
            ) * classified_price_swings["Lows"]
            price_at_indi_highs = (
                classified_indicator_swings["FSH"] != 0
            ) * classified_price_swings["Highs"]

            # Determine change in price between indicator lows
            price_at_indi_lows_change = np.sign(price_at_indi_lows) * (
                price_at_indi_lows
                - price_at_indi_lows.replace(to_replace=0, method="ffill").shift()
            )
            price_at_indi_highs_change = np.sign(price_at_indi_highs) * (
                price_at_indi_highs
                - price_at_indi_highs.replace(to_replace=0, method="ffill").shift()
            )

            # DETECT DIVERGENCES
            regular_bullish = (classified_indicator_swings["HL"]) & (
                price_at_indi_lows_change < 0
            )
            regular_bearish = (classified_indicator_swings["LH"]) & (
                price_at_indi_highs_change > 0
            )
            hidden_bullish = (classified_indicator_swings["LL"]) & (
                price_at_indi_lows_change > 0
            )
            hidden_bearish = (classified_indicator_swings["HH"]) & (
                price_at_indi_highs_change < 0
            )

        divergence = pd.DataFrame(
            data={
                "regularBull": regular_bullish,
                "regularBear": regular_bearish,
                "hiddenBull": hidden_bullish,
                "hiddenBear": hidden_bearish,
            },
            index=classified_price_swings.index,
        )

    else:
        raise Exception("Error: unrecognised method of divergence detection.")

    return divergence

def autodetect_divergence(
    ohlc: pd.DataFrame,
    indicator_data: pd.DataFrame,
    tolerance: int = 1,
    method: int = 0,
) -> pd.DataFrame:
    """A wrapper method to automatically detect divergence from inputted OHLC price
    data and indicator data.
    Parameters
    ----------
    ohlc : pd.DataFrame
        A dataframe of OHLC price data.
    indicator_data : pd.DataFrame
        dataframe of indicator data.
    tolerance : int, optional
        A parameter to control the lookback when detecting divergence.
        The default is 1.
    method : int, optional
        The divergence detection method. Set to 0 to use both price and
        indicator swings to detect divergence. Set to 1 to use only indicator
        swings to detect divergence. The default is 0.
    Returns
    -------
    divergence : pd.DataFrame
        A DataFrame containing columns 'regularBull', 'regularBear',
        'hiddenBull' and 'hiddenBear'.
    See Also
    --------
    autotrader.indicators.find_swings
    autotrader.indicators.classify_swings
    autotrader.indicators.detect_divergence
    """

    # Price swings
    price_swings = find_swings(ohlc)
    price_swings_classified = classify_swings(price_swings)

    # Indicator swings
    indicator_swings = find_swings(indicator_data)
    indicator_classified = classify_swings(indicator_swings)

    # Detect divergence
    divergence = detect_divergence(
        price_swings_classified, indicator_classified, tol=tolerance, method=method
    )

    return divergence

In [44]:
symbol = 'TCS.NS'
#symbol = 'IDFCFIRSTB.NS'
#symbol = '^NSEBANK'
#symbol = "^NSEI"
#symbol = "MSFT"
#df_y = get_stock_price(symbol, "1y", "1d")
df_y = get_stock_price(symbol, "1mo", "15m")

[*********************100%***********************]  1 of 1 completed


In [45]:
def _color_red_or_green(val):
    color = 'red' if val < 0 else 'green'
    return 'color: %s' % color

df_y.ta.rsi(close=df_y["Close"], append=True)
# result = autodetect_divergence(df_y, df_y["RSI_14"], method=1)
# result[(result["hiddenBear"] > 0) | (result['regularBear'] > 0) | (result["hiddenBull"] > 0) | (result["regularBull"] > 0)]

result = find_swings(df_y)
result = classify_swings(result)
#set(result["Levels"])
result.style.background_gradient(cmap='Blues')
#result.to_csv("C:\Sachin\OST-API\python_functions\output.csv")

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,RSI_14,Last_lows,Lows,Last_highs,Highs,CSLS_highs,CSLS_lows,LSL,LSH,Trend_signal
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
2022-10-28 12:45:00,2022-10-28 12:45:00,3164.949951,3165.449951,3163.100098,3164.399902,3164.399902,20057,,0.0,0.0,0.0,0.0,0,0,0.0,0.0,0
2022-10-28 13:00:00,2022-10-28 13:00:00,3164.949951,3166.0,3164.050049,3165.050049,3165.050049,37078,,0.0,0.0,0.0,0.0,1,1,0.0,0.0,0
2022-10-28 13:15:00,2022-10-28 13:15:00,3165.100098,3166.0,3164.5,3165.949951,3165.949951,43181,,0.0,0.0,0.0,0.0,2,2,0.0,0.0,0
2022-10-28 13:30:00,2022-10-28 13:30:00,3164.949951,3166.25,3164.050049,3165.449951,3165.449951,30990,,3164.050049,3164.050049,3166.0,3166.0,0,0,0.0,0.0,0
2022-10-28 13:45:00,2022-10-28 13:45:00,3165.050049,3166.0,3162.800049,3164.399902,3164.399902,31586,,3164.050049,3164.050049,3166.25,3166.25,0,1,3164.050049,0.0,0
2022-10-28 14:00:00,2022-10-28 14:00:00,3164.050049,3165.5,3163.0,3164.899902,3164.899902,35198,,3162.800049,3162.800049,3166.25,3166.25,1,0,3164.050049,3166.25,0
2022-10-28 14:15:00,2022-10-28 14:15:00,3164.899902,3166.0,3164.149902,3165.0,3165.0,34002,,3162.800049,3162.800049,3166.25,3166.25,2,1,3162.800049,3166.25,0
2022-10-28 14:30:00,2022-10-28 14:30:00,3165.0,3166.899902,3164.149902,3165.0,3165.0,40727,,3162.800049,3162.800049,3166.25,3166.25,3,2,3162.800049,3166.25,0
2022-10-28 14:45:00,2022-10-28 14:45:00,3164.949951,3165.899902,3161.0,3162.0,3162.0,45262,,3162.800049,3162.800049,3166.899902,3166.899902,0,3,3162.800049,3166.25,0
2022-10-28 15:00:00,2022-10-28 15:00:00,3162.050049,3165.800049,3161.75,3164.350098,3164.350098,122697,,3161.0,3161.0,3166.899902,3166.899902,1,0,3162.800049,3166.899902,0


In [40]:
df = pd.DataFrame({"A":[None, "HL", "LH", "LL", None, None],  
                   "B":["HH", "LL", None, None, None, "8"], 
                   "C":[None, "HL", "LL", "LH", None, "8"]}) 
print(df)

print(df.ffill(axis = 0) )
print(df.bfill(axis =0) )

      A     B     C
0  None    HH  None
1    HL    LL    HL
2    LH  None    LL
3    LL  None    LH
4  None  None  None
5  None     8     8
      A   B     C
0  None  HH  None
1    HL  LL    HL
2    LH  LL    LL
3    LL  LL    LH
4    LL  LL    LH
5    LL   8     8
      A   B   C
0    HL  HH  HL
1    HL  LL  HL
2    LH   8  LL
3    LL   8  LH
4  None   8   8
5  None   8   8
