# Just for Test - temp code

### Import Library

In [1]:
import numpy as np
import pandas as pd
import numpy as np
import pandas_ta as ta
import seaborn as sns
from scipy.signal import find_peaks
from scipy.ndimage import gaussian_filter1d
from scipy.stats import skew

import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [12, 6]
plt.rcParams['figure.dpi'] = 120
import warnings
warnings.filterwarnings('ignore')

### Load Price Data

In [5]:
import os
from pathlib import Path
notebook_path = os.getcwd()
current_dir = Path(notebook_path)
csv_file = str(current_dir) + '/VN30F1M_5minutes.csv'
is_file = os.path.isfile(csv_file)
if is_file:
    dataset = pd.read_csv(csv_file, index_col='Date', parse_dates=True)
else:
    print('remote')
    dataset = pd.read_csv("https://raw.githubusercontent.com/zuongthaotn/vn-stock-data/main/VN30ps/VN30F1M_5minutes.csv", index_col='Date', parse_dates=True)

In [6]:
data = dataset.copy()

In [None]:
# data = data[data.index > '2020-11-01 00:00:00']

In [7]:
data

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2018-08-13 09:00:00,943.5,943.6,942.9,943.1,1812
2018-08-13 09:05:00,943.1,943.5,942.9,943.3,1323
2018-08-13 09:10:00,943.2,943.3,942.6,943.1,1207
2018-08-13 09:15:00,943.1,943.1,942.3,942.6,1196
2018-08-13 09:20:00,942.6,943.7,942.4,943.7,1765
...,...,...,...,...,...
2025-02-14 14:15:00,1343.0,1343.0,1340.3,1341.3,7141
2025-02-14 14:20:00,1340.9,1341.9,1340.5,1341.4,4593
2025-02-14 14:25:00,1341.1,1342.5,1340.7,1342.5,4207
2025-02-14 14:30:00,1342.5,1342.5,1342.5,1342.5,150


In [None]:
# optional sklearn import for GMM
try:
    from sklearn.mixture import GaussianMixture
    SKLEARN_AVAILABLE = True
except Exception:
    SKLEARN_AVAILABLE = False

# ---------- helper: weighted skewness ----------
def weighted_skewness(values, weights):
    """
    Weighted skewness (Pearson's moment coefficient) for arrays values and weights.
    """
    values = np.asarray(values, dtype=float)
    weights = np.asarray(weights, dtype=float)
    if weights.sum() == 0:
        return 0.0
    w_mean = (weights * values).sum() / weights.sum()
    diff = values - w_mean
    w_var = (weights * diff**2).sum() / weights.sum()
    if w_var == 0:
        return 0.0
    w_std = np.sqrt(w_var)
    m3 = (weights * diff**3).sum() / weights.sum()
    return m3 / (w_std**3)


# ---------- advanced detect function ----------
def detect_vp_shape_advanced(price_levels, vol_profile, 
                              smooth_sigma=1.0,
                              peak_rel_height=0.15,
                              peak_prominence_rel=0.12,
                              gmm_max_components=3,
                              gmm_weight_threshold=0.12,
                              secondary_peak_ratio=0.35):
    """
    Return: (shape, diagnostics_dict)
    shape in {'D','P','b','Double','Undefined'}
    diagnostics: dict with peaks, gmm_info, skewness, etc.
    """
    res = {}
    vol = np.asarray(vol_profile, dtype=float)
    if vol.sum() == 0:
        return "Undefined", {"reason": "zero_volume"}
    # smooth to remove tiny bumps
    vol_smooth = gaussian_filter1d(vol, sigma=smooth_sigma)
    res['vol_max'] = vol_smooth.max()
    # normalize
    vol_norm = vol_smooth / vol_smooth.max()

    # 1) Peak detection with prominence & relative height thresholds
    promin = peak_prominence_rel * vol_norm.max()  # absolute in [0..1]
    height_th = peak_rel_height  # relative to normalized max (0..1)
    peaks, props = find_peaks(vol_norm, height=height_th, prominence=promin, distance=1)
    peak_heights = props.get('peak_heights', vol_norm[peaks] if len(peaks)>0 else np.array([]))
    res['peaks_idx'] = peaks.tolist()
    res['peaks_heights'] = peak_heights.tolist()

    # if no significant peaks -> undefined
    if len(peaks) == 0:
        return "Undefined", res

    # sort peaks by height desc
    order = np.argsort(peak_heights)[::-1]
    peaks_sorted = peaks[order]
    heights_sorted = peak_heights[order]

    # if multiple peaks, check relative height of top2 vs top1
    if len(heights_sorted) >= 2:
        ratio_2_to_1 = heights_sorted[1] / heights_sorted[0] if heights_sorted[0] > 0 else 0
        res['second_vs_first_ratio'] = ratio_2_to_1
    else:
        res['second_vs_first_ratio'] = 0.0

    # 2) Use GMM (if available) to detect number of components robustly
    gmm_info = {}
    if SKLEARN_AVAILABLE:
        # We'll fit GMM on price levels using vol as sample_weight
        X = price_levels.reshape(-1, 1)
        best_bic = np.inf
        best_gmm = None
        for k in range(1, min(gmm_max_components, len(price_levels)) + 1):
            try:
                g = GaussianMixture(n_components=k, covariance_type='full', random_state=0)
                # sklearn's fit accepts sample_weight in recent versions
                try:
                    g.fit(X, sample_weight=vol)  # weighted fit
                except TypeError:
                    # older sklearn versions might not accept sample_weight -> repeat samples
                    reps = np.clip((vol / vol.max() * 200).astype(int), 1, 200)
                    X_rep = np.repeat(X, reps, axis=0)
                    g.fit(X_rep)
                bic = g.bic(X)
                if bic < best_bic:
                    best_bic = bic
                    best_gmm = g
            except Exception as e:
                continue
        if best_gmm is not None:
            weights = best_gmm.weights_
            means = best_gmm.means_.flatten()
            covs = best_gmm.covariances_.flatten() if best_gmm.covariances_.ndim == 1 else np.array([cov.flatten()[0] for cov in best_gmm.covariances_])
            # sort components by mean
            idx = np.argsort(means)
            weights = weights[idx]
            means = means[idx]
            gmm_info = {"n_components": best_gmm.n_components, "weights": weights.tolist(), "means": means.tolist()}
        else:
            gmm_info = {"n_components": None}
    else:
        gmm_info = {"sklearn": False}
    res['gmm'] = gmm_info

    # Decision logic:
    # a) If sklearn GMM says >=2 components and both components have reasonable weight -> Double
    if SKLEARN_AVAILABLE and gmm_info.get("n_components") and gmm_info["n_components"] >= 2:
        weights = np.array(gmm_info["weights"])
        # consider only components with weight above threshold
        significant = weights[weights >= gmm_weight_threshold]
        res['gmm_significant_weights'] = significant.tolist()
        if len(significant) >= 2:
            return "Double", res
        # else fallthrough to peak-based logic

    # b) Peak-based: if multiple peaks and second peak height is not tiny vs first -> Double
    if len(heights_sorted) >= 2 and res['second_vs_first_ratio'] >= secondary_peak_ratio:
        return "Double", res

    # c) Otherwise treat as unimodal. Decide D / P / b by weighted skewness of volume across price_levels
    w_skew = weighted_skewness(price_levels, vol_smooth)
    res['weighted_skewness'] = float(w_skew)
    # thresholds for skew to classify
    skew_thresh = 0.2
    if abs(w_skew) < skew_thresh:
        shape = "D"
    elif w_skew > 0:
        shape = "P"   # skew > 0 means tail to the right? interpret as top-heavy -> P
    else:
        shape = "b"
    return shape, res


# ---------- Integrate into analyze_vp_per_day ----------
def analyze_vp_per_day_advanced(df, bins=24, value_area=0.7):
    df = df.copy()
    df.index = pd.to_datetime(df.index)
    results = []
    diagnostics = {}

    for day, df_day in df.groupby(df.index.date):
        # compute volume profile: use bin centers and accumulate volume for bins that nandle high-low range
        price_min, price_max = df_day['Low'].min(), df_day['High'].max()
        if price_max <= price_min:
            continue
        price_bins = np.linspace(price_min, price_max, bins + 1)
        price_levels = (price_bins[:-1] + price_bins[1:]) / 2
        vol_profile = np.zeros(len(price_levels), dtype=float)

        # Distribute each candle's volume to bins it covers (simple inclusive method on Close could be replaced by more complex)
        for _, r in df_day.iterrows():
            # mark bins which price range intersects (Low..High)
            mask = (price_levels >= r['Low']) & (price_levels <= r['High'])
            if mask.any():
                vol_profile[mask] += r['Volume'] / mask.sum()
            else:
                # fallback: assign to nearest bin by close
                idx = np.argmin(np.abs(price_levels - r['Close']))
                vol_profile[idx] += r['Volume']

        # POC
        poc_price = price_levels[np.argmax(vol_profile)]

        # VAH / VAL: compute value area (70%) by sorting bins desc
        total_vol = vol_profile.sum()
        if total_vol > 0:
            sorted_idx = np.argsort(vol_profile)[::-1]
            cum = np.cumsum(vol_profile[sorted_idx])
            cutoff = value_area * total_vol
            vals_idx = sorted_idx[cum <= cutoff]
            if len(vals_idx) == 0:
                # ensure at least the top bin included
                vals_idx = np.array([sorted_idx[0]])
            vah = price_levels[vals_idx].max()
            val = price_levels[vals_idx].min()
        else:
            vah = val = np.nan

        # detect shape advanced
        shape, diag = detect_vp_shape_advanced(price_levels, vol_profile)
        diag.update({"poc": float(poc_price), "vah": float(vah), "val": float(val)})
        diagnostics[str(pd.Timestamp(day).date())] = diag

        results.append({
            "Date": pd.Timestamp(day),
            "POC": float(poc_price),
            "VAH": float(vah),
            "VAL": float(val),
            "Shape": shape
        })

    res_df = pd.DataFrame(results).sort_values("Date").reset_index(drop=True)
    return res_df, diagnostics
