In [1]:
%%bash
cat > Vol_Adj_Trend_Analysis_Cleanup.py << 'EOF'
# Vol_Adj_Trend_Analysis_Cleanup

# --- 1. SETUP CELL ---
import logging, sys, os, warnings, random
import math
import numpy as np
import pandas as pd
import ipywidgets as widgets
from ipywidgets import VBox, HBox
from IPython.display import display, clear_output
from ipyfilechooser import FileChooser
import datetime

# Configure logging
logging.basicConfig(
    stream=sys.stdout,
    level=logging.INFO,
    format="%(levelname)s: %(message)s"
)

# --- 2. Identify Risk-Free Fund ---
def identify_risk_free_fund(df):
    """
    Identify the risk-free column (smallest stddev among numeric columns).
    Returns the column name.
    """
    cols = df.columns[1:]
    stdevs = {
        col: df[col].dropna().std() if not df[col].dropna().empty else np.inf
        for col in cols
    }
    rf = min(stdevs, key=stdevs.get)
    logging.info(f"Risk-free column: {rf}")
    return rf

# --- 3. Robust CSV Reader ---
def robust_read_csv(path):
    """
    Load CSV with fallback strategies:
    – Default engine
    – BOM-stripped with Python engine
    – Skip bad lines with Python engine
    """
    try:
        return pd.read_csv(path)
    except Exception:
        pass
    try:
        return pd.read_csv(path, sep=",", encoding="utf-8-sig", engine="python")
    except Exception:
        pass
    return pd.read_csv(
        path,
        sep=",",
        engine="python",
        encoding="utf-8-sig",
        skip_blank_lines=True,
        on_bad_lines="skip",
    )

# --- 4. Utility Functions ---
def consecutive_gaps(series, threshold=3):
    """
    Return True if there are ≥ threshold consecutive NaNs in the series.
    """
    count = 0
    for v in series:
        count = count + 1 if pd.isna(v) else 0
        if count >= threshold:
            return True
    return False

def fill_short_gaps_with_zero(series):
    """
    Replace runs of 1–2 NaNs with 0.0; leave longer gaps intact.
    """
    mask = series.isna().astype(int)
    runs = mask.groupby((mask == 0).cumsum()).transform('sum')
    out = series.copy()
    out[(mask == 1) & (runs <= 2)] = 0.0
    return out

# --- 5. Annualized Metrics ---
def annualize_return(m_returns):
    """
    Geometric annual return from monthly returns (decimal).
    """
    vals = m_returns.dropna()
    if vals.empty:
        return np.nan
    growth = (1 + vals).prod()
    months = len(vals)
    return (growth ** (12.0 / months)) - 1 if growth > 0 else -1.0

def annualize_volatility(m_returns):
    """
    Annualized volatility = std(monthly) * sqrt(12).
    """
    vals = m_returns.dropna()
    return vals.std() * np.sqrt(12) if len(vals) > 1 else np.nan

def sharpe_ratio(m_returns, rf_series):
    """
    Annualized Sharpe = ann_excess_return / ann_excess_volatility.
    """
    df = pd.DataFrame({'r': m_returns, 'rf': rf_series}).dropna()
    if len(df) < 2:
        return np.nan
    excess = df['r'] - df['rf']
    growth = (1 + excess).prod()
    months = len(excess)
    ann_ret = (growth ** (12.0 / months)) - 1 if growth > 0 else np.nan
    ann_vol = excess.std() * np.sqrt(12)
    return ann_ret / ann_vol if ann_vol > 0 else np.nan

def sortino_ratio(m_returns, rf_series):
    """
    Annualized Sortino = ann_excess_return / downside_stdev.
    """
    df = pd.DataFrame({'r': m_returns, 'rf': rf_series}).dropna()
    if len(df) < 2:
        return np.nan
    excess = df['r'] - df['rf']
    growth = (1 + excess).prod()
    months = len(excess)
    ann_ret = (growth ** (12.0 / months)) - 1 if growth > 0 else np.nan
    downs = excess[excess < 0]
    down_stdev = downs.std() * np.sqrt(12) if not downs.empty else np.inf
    return ann_ret / down_stdev

def max_drawdown(m_returns):
    """
    Maximum drawdown from monthly returns.
    """
    vals = m_returns.dropna()
    if vals.empty:
        return np.nan
    wealth = (1 + vals).cumprod()
    dd = 1 - wealth / wealth.cummax()
    return dd.max()

# --- 6. Select Funds ---
def select_funds(
    df,
    rf_col,
    fund_columns,
    in_sdate, in_edate,
    out_sdate, out_edate,
    selection_mode='all',
    random_n=8
):
    """
    – Start with fund_columns
    – Drop any containing 'index'
    – Filter out funds with NaNs in either window or ≥3 consecutive NaNs
    – Return either all or random subset
    """
    candidates = [f for f in fund_columns if 'index' not in f.lower()]
    valid = []
    for f in candidates:
        in_sub  = df.loc[in_sdate:in_edate, f]
        out_sub = df.loc[out_sdate:out_edate, f]
        if in_sub.notna().all() and out_sub.notna().all():
            if not consecutive_gaps(in_sub) and not consecutive_gaps(out_sub):
                valid.append(f)
    if selection_mode == 'all':
        return valid
    if selection_mode == 'random':
        if len(valid) <= random_n:
            logging.warning(
                f"Fewer valid funds ({len(valid)}) than requested ({random_n}), returning all."
            )
            return valid
        return random.sample(valid, random_n)
    return valid

# --- 7. Custom Weights UI ---
def get_custom_weights(selected_funds):
    """
    Display widgets for each fund to enter weights. Validate sum=100.
    """
    weight_widgets = {}
    for fund in selected_funds:
        w = widgets.BoundedIntText(
            value=0,
            min=0,
            max=100,
            description=fund,
            layout=widgets.Layout(width='250px')
        )
        weight_widgets[fund] = w
    confirm = widgets.Button(description='Confirm', button_style='success')
    error_lbl = widgets.Label(layout=widgets.Layout(color='red'))
    box = VBox(list(weight_widgets.values()) + [confirm, error_lbl])
    display(box)
    weights = {}
    def on_confirm(_):
        total = sum(w.value for w in weight_widgets.values())
        if total != 100:
            error_lbl.value = f"Weights sum to {total}, must be 100."
            weights.clear()
        else:
            for fund, w in weight_widgets.items():
                weights[fund] = w.value / 100.0
            error_lbl.value = "Weights confirmed"
    confirm.on_click(on_confirm)
    return weights

# --- 8. run_analysis ---
def run_analysis(
    df,
    in_start, in_end,
    out_start, out_end,
    target_vol,
    monthly_cost,
    selection_mode='all',
    random_n=8,
    custom_weights=None
):
    """
    – Parse YYYY-MM inputs to month-end timestamps
    – Ensure Date column is datetime64
    – Identify risk-free column
    – Slice DataFrames for in/out samples
    – Select funds and compute scale factors
    – Scale returns, compute stats, and build results dict
    """
    # implementation omitted for brevity; same as in notebook

# --- 9. Export & UI Callback ---
def export_to_excel(
    results, df, fname,
    in_start, in_end, out_start, out_end
):
    """
    Single-writer export including portfolio sheets and indices blocks.
    """
    # implementation omitted for brevity; same as in notebook
EOF

echo "Written: Vol_Adj_Trend_Analysis_Cleanup.py"


Written: Vol_Adj_Trend_Analysis_Cleanup.py
