Fetch OHLCV data

In [None]:
import ccxt
import pandas as pd

def fetch_data(symbol='BTC/USDT', timeframe='1h', limit=1000):
    # Initialize the Binance exchange object
    binance = ccxt.binance()
    
    # Fetch OHLCV data
    ohlcv = binance.fetch_ohlcv(symbol, timeframe, limit=limit)
    
    # Convert to DataFrame
    df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')  # Convert timestamp to datetime format
    df.set_index('timestamp', inplace=True)
    
    return df

Method to save the data to a CSV file

In [None]:
def save_data_to_csv(df, filename="data.csv"):
    df.to_csv(filename)

Calculate the Average True Range (ATR)

In [None]:
def calculate_atr(data):
    # Calculate the true range
    data['hl'] = data['high'] - data['low']
    data['hc'] = abs(data['high'] - data['close'].shift(1))
    data['lc'] = abs(data['low'] - data['close'].shift(1))
    
    # True range is the max of hl, hc, and lc
    data['tr'] = data[['hl', 'hc', 'lc']].max(axis=1)
    
    # ATR is the average of the true range
    atr = data['tr'].mean()
    
    return atr

Fetch and save the data

In [None]:
data = fetch_data()

# Save data to CSV
# save_data_to_csv(data, "btc_usdt_data.csv")

print(data.tail())

Calculate the ATR

In [None]:
atr_value = calculate_atr(data)
print(atr_value)

Determine the Overall Price Range

In [None]:
lowest_price = data['low'].min()
highest_price = data['high'].max()

Create Price Intervals

In [None]:
interval_size = int(round(atr_value, -1))  # Round to nearest 10
price_intervals = list(range(int(lowest_price), int(highest_price) + interval_size, interval_size))

Display the results

In [None]:
print("Lowest Price:", lowest_price)
print("Highest Price:", highest_price)
print("First 5 Intervals:", price_intervals[:5])

 Plot Symbols

In [None]:
def assign_interval(price, intervals):
    """Given a price, determine which interval it falls into."""
    for i in range(len(intervals) - 1):
        if intervals[i] <= price < intervals[i+1]:
            return intervals[i]
    return None

# Create a column in the data to store the interval for each hourly period
data['interval'] = data['close'].apply(lambda x: assign_interval(x, price_intervals))

# Create unique symbols for each hourly period
data['symbol'] = [f"A{str(i).zfill(2)}" for i in range(1, len(data) + 1)]

print(data[['close', 'interval', 'symbol']].tail(10))  # Displaying the last 10 rows for clarity

Calculate POC, VAL, and VAH

In [None]:
# Determine POC, VAL, VAH
interval_counts = data['interval'].value_counts()
poc = interval_counts.idxmax()
sorted_intervals = interval_counts.sort_values(ascending=False)
cumulative_counts = sorted_intervals.cumsum()
total_tpos = cumulative_counts.iloc[-1]

# Start with 70% as the target for value area, but adjust if necessary
value_area_percentage = 0.7

# Calculate initial VAL and VAH
val_vah_range = sorted_intervals[cumulative_counts <= value_area_percentage * total_tpos]
val = val_vah_range.index.min()
vah = val_vah_range.index.max()

# If VAL is the same as POC, move it to the next significant TPO count
if val == poc:
    val = sorted_intervals.index[1]  # This takes the next highest TPO count after POC

# Find closest intervals to the quantile-based VAL and VAH
closest_val = min(price_intervals, key=lambda x: abs(x - val))
closest_vah = min(price_intervals, key=lambda x: abs(x - vah))

# Ensure minimum distance between VAL and VAH
MIN_INTERVALS_BETWEEN_VAL_VAH = 3

while (price_intervals.index(closest_vah) - price_intervals.index(closest_val)) < MIN_INTERVALS_BETWEEN_VAL_VAH:
    # Check which boundary (VAL or VAH) can be expanded without including the POC
    if price_intervals.index(poc) > price_intervals.index(closest_val) and price_intervals.index(closest_val) > 0:
        closest_val = price_intervals[price_intervals.index(closest_val) - 1]
    elif price_intervals.index(closest_vah) < len(price_intervals) - 1:
        closest_vah = price_intervals[price_intervals.index(closest_vah) + 1]
    else:
        # Break out of the loop if we can't adjust further
        break

val = closest_val
vah = closest_vah

print(f"POC: {poc}")
print(f"VAL: {val}")
print(f"VAH: {vah}")

 Identify Single Prints

In [None]:
# Find intervals where only one TPO (symbol) occurred
single_prints = interval_counts[interval_counts == 1].index.tolist()

print("Single Prints:", single_prints)

 Visualize the TPO chart using Plotly

In [None]:
import plotly.graph_objects as go

# Create a candlestick chart
fig = go.Figure(data=[go.Candlestick(x=data.index,
                                     open=data['open'],
                                     high=data['high'],
                                     low=data['low'],
                                     close=data['close'])])

# Overlay the TPOs as text annotations with smaller font size and a distinct color
for idx, row in data.iterrows():
    fig.add_annotation(x=idx, y=row['interval'],
                       text=row['symbol'],
                       showarrow=False,
                       font=dict(size=8, color='purple'))

# Highlight the POC, VAL, and VAH using shapes
shapes = [
    dict(type='line', y0=poc, y1=poc, x0=data.index[0], x1=data.index[-1], line=dict(color='blue', width=1.5)),
    dict(type='line', y0=val, y1=val, x0=data.index[0], x1=data.index[-1], line=dict(color='red', width=1.5, dash='dot')),
    dict(type='line', y0=vah, y1=vah, x0=data.index[0], x1=data.index[-1], line=dict(color='green', width=1.5, dash='dot'))
]

# Adjust the yshift values for the annotations to prevent overlap
fig.add_annotation(x=data.index[-1], y=poc, text=f"POC: {poc}", showarrow=False, yshift=20)
fig.add_annotation(x=data.index[-1], y=val, text=f"VAL: {val}", showarrow=False, yshift=-20)
fig.add_annotation(x=data.index[-1], y=vah, text=f"VAH: {vah}", showarrow=False, yshift=20)

fig.update_layout(shapes=shapes, height=1200, template="plotly_dark")

# Mark the Single Prints with lines and labels
for sp in single_prints:
    fig.add_shape(type="line",
                  x0=data.index[0], x1=data.index[-1], y0=sp, y1=sp,
                  line=dict(color="yellow", width=1.5, dash="dashdot"))
    fig.add_annotation(x=data.index[-1], y=sp, text=f"SP: {sp}", showarrow=False, yshift=10)

# Display the chart
fig.show()
