In [None]:
import plotly.graph_objects as go
import numpy as np

In [None]:
# QuantBook Analysis Tool
# For more information see [https://www.quantconnect.com/docs/v2/our-platform/research/getting-started]
qb = QuantBook()
start_date = datetime(2020, 1, 1)
end_date = datetime(2025, 1, 1)
spy = qb.add_equity("SPY", Resolution.DAILY).symbol
history = qb.history(qb.securities.keys(), start_date, end_date).droplevel(0)

history

In [None]:
fig_raw = go.Figure()
fig_raw.add_trace(go.Scatter(x=history.index,  y=history.close))
fig_raw.show()

In [None]:


def fit_line(x, y):
    """Return intercept and slope for a simple linear regression fit."""
    x_mean, y_mean = np.mean(x), np.mean(y)
    ss_x = np.sum((x - x_mean)**2)
    cov_xy = np.sum((x - x_mean) * (y - y_mean))
    slope = cov_xy / ss_x if ss_x != 0 else 0.0
    intercept = y_mean - slope * x_mean
    return intercept, slope

def r2_score(x, y, intercept, slope):
    """Return R² score for line fit."""
    yhat = intercept + slope * x
    ss_res = np.sum((y - yhat)**2)
    ss_tot = np.sum((y - np.mean(y))**2)
    return 1 - ss_res/ss_tot if ss_tot > 0 else 1.0

def dynamic_linear_segments_adaptive(price_series, min_points=12, r2_threshold=0.85, k_sigma=3.0, max_tol_factor=0.03):
    """
    Dynamically fit linear segments to a price series.
    - min_points: minimum segment length
    - r2_threshold: minimum acceptable R² for the segment
    - k_sigma: multiplier for standard deviation of residuals
    - max_tol_factor: maximum allowed tolerance (fraction of price)
    """
    t_numeric = price_series.index.view(np.int64) / 1e9
    y_log = np.log(price_series.values)

    segments = []
    segment_start_idx = 0
    last_y_end = None
    i = segment_start_idx + min_points - 1

    while i < len(price_series):
        segment_x = t_numeric[segment_start_idx:i+1]
        segment_y = y_log[segment_start_idx:i+1]

        intercept, slope = fit_line(segment_x, segment_y)
        r2 = r2_score(segment_x, segment_y, intercept, slope)
        resid = segment_y - (intercept + slope * segment_x)
        resid_std = np.std(resid) if len(resid) > 1 else 1e-6

        # Adaptive tolerance: proportional to residual volatility or capped at max_tol_factor
        tol = min(max(k_sigma * resid_std, 1e-6), max_tol_factor)
        err_new = abs(resid[-1])

        # If fit fails, cut segment at last valid point
        if r2 < r2_threshold or err_new > tol:
            cut_idx = i - 1 if i - 1 >= segment_start_idx + min_points - 1 else segment_start_idx + min_points - 1
            seg_x = t_numeric[segment_start_idx:cut_idx+1]
            seg_y = y_log[segment_start_idx:cut_idx+1]
            intercept_seg, slope_seg = fit_line(seg_x, seg_y)
            r2_seg = r2_score(seg_x, seg_y, intercept_seg, slope_seg)

            if last_y_end is not None:
                intercept_seg = last_y_end - slope_seg * seg_x[0]
            last_y_end = intercept_seg + slope_seg * seg_x[-1]

            segments.append({
                "start_idx": segment_start_idx,
                "end_idx": cut_idx,
                "start_time": price_series.index[segment_start_idx],
                "end_time": price_series.index[cut_idx],
                "slope": slope_seg,
                "intercept": intercept_seg,
                "r2": r2_seg
            })

            # Reset for next segment
            segment_start_idx = cut_idx + 1
            i = segment_start_idx + min_points - 1
        else:
            i += 1

    # Add final segment
    if segment_start_idx < len(price_series):
        seg_x = t_numeric[segment_start_idx:]
        seg_y = y_log[segment_start_idx:]
        intercept_seg, slope_seg = fit_line(seg_x, seg_y)
        r2_seg = r2_score(seg_x, seg_y, intercept_seg, slope_seg)
        if last_y_end is not None:
            intercept_seg = last_y_end - slope_seg * seg_x[0]

        segments.append({
            "start_idx": segment_start_idx,
            "end_idx": len(price_series)-1,
            "start_time": price_series.index[segment_start_idx],
            "end_time": price_series.index[-1],
            "slope": slope_seg,
            "intercept": intercept_seg,
            "r2": r2_seg
        })

    return segments

def plot_segments(price_series, segments):
    """Plot price series with dynamically fitted line segments using Plotly."""
    fig = go.Figure()

    # Add price series as black line
    fig.add_trace(go.Scatter(
        x=price_series.index,
        y=price_series.values,
        mode='lines',
        line=dict(color='black'),
        name='Price'
    ))

    t_numeric = price_series.index.view(np.int64) / 1e9

    # Add each fitted segment
    for seg in segments:
        xs_numeric = t_numeric[seg["start_idx"]:seg["end_idx"]+1]
        ys_log = seg["intercept"] + seg["slope"] * xs_numeric
        ys = np.exp(ys_log)
        xs_datetime = price_series.index[seg["start_idx"]:seg["end_idx"]+1]

        fig.add_trace(go.Scatter(
            x=xs_datetime,
            y=ys,
            mode='lines',
            line=dict(width=2),
            name=f'Segment {seg["start_time"].strftime("%Y-%m-%d")}'
        ))

    fig.update_layout(
        title="Price Series with Dynamically Fitted Line Segments",
        xaxis_title="Date",
        yaxis_title="Price",
        showlegend=False,
        template="plotly_white"
    )

    fig.show()

# --- USAGE EXAMPLE ---
segments = dynamic_linear_segments(history.close)
plot_segments(history.close, segments)
change_in_gradient = {}
gradient = True
for seg in segments:
    print(seg)
    if seg['slope'] <= 0:
        gradient = False
    else:
        gradient = True
    change_in_gradient[seg['end_time']] = gradient

change_in_gradient
