##### The VPN indicator calculates the difference of up day volume and down day volume, divided by the total volume. A day is an up day when its typical price – the average of high, low, and close – is higher than yesterday’s typical price plus 1/10 ATR. The opposite is true for down days. The result is multiplied by 100 and smoothed with a 3-day EMA. The C code of this indicator:

In [1]:
import pandas as pd
import numpy as np

##### Je calcul l'ATR, pour chaque bougie je prend 1/10 de la valeur de l'ATR de la veille puis je regarde si la bougie actuel a été up de + de 1/10 de l'ATR de hier ou le contraire.


In [None]:

""" Faire attention car quand tu sends la data, pour faire l'ATR notament tu vas avoir besoin de period + 1 pour faire fonctionner l'algo.. """
""" La période optimal serait une période de 30 data """

def calculate_typical_price(data):
    """Calcule le prix typique (average de high, low, close)"""
    return (data['high'] + data['low'] + data['close']) / 3

def calculate_true_range(high, low, close_prev):
    """Calcule le true range pour l'ATR"""
    tr1 = high - low
    tr2 = np.abs(high - close_prev)
    tr3 = np.abs(low - close_prev)
    return np.maximum.reduce([tr1, tr2, tr3])

def calculate_atr(data, period):
    """Calcule l'ATR pour une certaine période puis renvoie la valeur"""
    data.loc[:, 'Close_prev'] = data['close'].shift(1)
    data.loc[:, 'TR'] = calculate_true_range(data['high'], data['low'], data['Close_prev'])
    alpha = 2 / (period + 1)
    data.loc[:, 'ATR'] = data['TR'].ewm(span=period, adjust=False).mean()
    return data


def calculate_vpn(data, period):
    """Calcule l'indicateur VPN"""
    # Étape 1: Calculer l'ATR et le prix typique
    data = calculate_atr(data, period)
    data['TypPrice'] = calculate_typical_price(data)
    data['Dist'] = 0.1 * data['ATR'].shift(1)

    # Initialiser les colonnes pour Vp, Vn et Vtotal
    data['Vp'] = 0.0
    data['Vn'] = 0.0
    data['Vtotal'] = 0.0

    for i in range(1, len(data)):
        typ_price = data.loc[data.index[i], 'TypPrice']
        typ_price_prev = data.loc[data.index[i - 1], 'TypPrice']
        dist = data.loc[data.index[i], 'Dist']
        volume = data.loc[data.index[i], 'volume']

        if typ_price > typ_price_prev + dist:
            data.loc[data.index[i], 'Vp'] = volume
        elif typ_price < typ_price_prev - dist:
            data.loc[data.index[i], 'Vn'] = volume

        data.loc[data.index[i], 'Vtotal'] = volume

    # Étape 3 : Calculer le VPN brut
    data['Vp_sum'] = data['Vp'].rolling(window=period).sum()
    data['Vn_sum'] = data['Vn'].rolling(window=period).sum()
    data['Vtotal_sum'] = data['Vtotal'].rolling(window=period).sum()

    data['VPN_raw'] = 100 * (data['Vp_sum'] - data['Vn_sum']) / data['Vtotal_sum']

    # Étape 4 : Lissage avec une EMA
    data['VPN'] = data['VPN_raw'].ewm(span=3, adjust=False).mean()

    return data

