<a href="https://colab.research.google.com/github/sabire113/Master/blob/main/Line%C3%A6r_10_mars.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
import warnings
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.linear_model import Lasso, Ridge, ElasticNet
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore", category=DeprecationWarning)
pd.set_option("display.max_rows", 200)  # Vis opptil 200 rader

# Hjelpefunksjon for winsorizing
def winsorize_series(s, lower_quantile=0.01, upper_quantile=0.99):
    lower = s.quantile(lower_quantile)
    upper = s.quantile(upper_quantile)
    return s.clip(lower, upper)

def load_and_clean_data(file_path):
    """
    Leser inn datasettet, konverterer datoer, fjerner unødvendige kolonner og
    beregner individuell overskuddsavkastning (Excess_Return) på aksjenivå.

    Beregning:
      Excess_Return = (MonthlyReturn i desimaler) - ((Norges Bank 10Y Yield/100) / 12)

    Dette er i tråd med artikkelens metode for pooled kryssseksjonell analyse.
    """
    df = pd.read_csv(file_path, low_memory=False)
    print("Kolonnenavn i datasettet:")
    print(df.columns.tolist())

    # Konverter 'Date' til datetime-format og sorter
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
    df.sort_values('Date', inplace=True)

    # Fjern kolonner som ikke brukes
    for col in ['Instrument', 'First Trade Date']:
        if col in df.columns:
            df.drop(columns=[col], inplace=True)

    # Bruk individuell aksjeavkastning ("MonthlyReturn") og risikofri rente ("Norges Bank 10Y Yield")
    stock_return_col = "MonthlyReturn"
    risk_free_col = "Norges Bank 10Y Yield"

    # Fjern observasjoner med manglende verdier for disse kolonnene
    df = df.dropna(subset=[stock_return_col, risk_free_col])

    # Konverter "MonthlyReturn" fra prosent til desimaler dersom nødvendig
    if df[stock_return_col].max() > 1:
        print("Konverterer MonthlyReturn fra prosent til desimaler")
        df[stock_return_col] = df[stock_return_col] / 100

    # Konverter risikofri rente: Vi antar at verdiene er gitt i absolutte tall (f.eks. 1.0 = 1%)
    # Derfor: (value/100) for å få desimal, deretter /12 for månedlig rate.
    if df[risk_free_col].max() > 1:
        print("Konverterer Norges Bank 10Y Yield fra prosent (årlig) til månedlig desimal")
        df[risk_free_col] = (df[risk_free_col] / 100) / 12
    else:
        print("Norges Bank 10Y Yield antas å være i desimalformat (årlig); konverterer til månedlig ved å dele med 12")
        df[risk_free_col] = df[risk_free_col] / 12

    # Beregn individuell overskuddsavkastning: (aksjeavkastning - månedlig risikofri rente)
    df['Excess_Return'] = df[stock_return_col] - df[risk_free_col]

    # Beregn aksjekarakteristika
    df["Size"] = np.log(df["MarketCap"])
    df["BM"] = (df["BookValuePerShare"] * df["CommonSharesOutstanding"]) / df["MarketCap"]
    df = df[df["BM"] > 0]
    df["BM"] = np.log(df["BM"])
    df["Mom12m"] = df["Momentum_12M"]
    df.drop(columns=["Momentum_12M"], inplace=True)

    # Winsorize alle numeriske variabler (unntatt 'Excess_Return')
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if "Excess_Return" in num_cols:
        num_cols.remove("Excess_Return")
    for col in num_cols:
        df[col] = winsorize_series(df[col])

    df.dropna(inplace=True)
    print("Etter rensing, dataframe-shape:", df.shape)
    return df

def split_stock_level_data_3way(df):
    """
    Deler datasettet i treningssett (60%), valideringssett (20%) og testsett (20%).
    Bruker StandardScaler for X og RobustScaler for y.
    """
    X = df.drop(columns=["Excess_Return", "Date", "MonthlyReturn", "Norges Bank 10Y Yield"])
    y = df["Excess_Return"]
    y = winsorize_series(y, lower_quantile=0.05, upper_quantile=0.95)
    X = X.select_dtypes(include=[np.number])

    scaler_X = StandardScaler()
    X_scaled = pd.DataFrame(scaler_X.fit_transform(X), index=X.index, columns=X.columns)

    n = len(df)
    train_end = int(n * 0.6)
    val_end = int(n * 0.8)

    X_train = X_scaled.iloc[:train_end]
    y_train = y.iloc[:train_end]
    X_val = X_scaled.iloc[train_end:val_end]
    y_val = y.iloc[train_end:val_end]
    X_test = X_scaled.iloc[val_end:]
    y_test = y.iloc[val_end:]

    scaler_y = RobustScaler()
    y_train_scaled = pd.Series(scaler_y.fit_transform(y_train.values.reshape(-1, 1)).flatten(), index=y_train.index)
    y_val_scaled = pd.Series(scaler_y.transform(y_val.values.reshape(-1, 1)).flatten(), index=y_val.index)
    y_test_scaled = pd.Series(scaler_y.transform(y_test.values.reshape(-1, 1)).flatten(), index=y_test.index)

    y_train_median = scaler_y.center_[0]
    y_train_scale = scaler_y.scale_[0]

    print("Treningssett:", X_train.shape)
    print("Valideringssett:", X_val.shape)
    print("Testsett:", X_test.shape)

    return X_train, y_train, y_train_scaled, X_val, y_val, y_val_scaled, X_test, y_test, y_test_scaled, scaler_X, y_train_median, y_train_scale

def run_ols(X_train, y_train_scaled, X_test, y_test, y_train_median, y_train_scale):
    """
    Kjører en pooled OLS-modell, skriver ut fullstendig regresjonssammendrag,
    og returnerer test-R².
    """
    X_train_const = sm.add_constant(X_train, has_constant='add')
    X_test_const = sm.add_constant(X_test, has_constant='add')
    model = sm.OLS(y_train_scaled, X_train_const).fit(cov_type='HAC', cov_kwds={'maxlags': 1})
    print("\nOLS-regresjonssammendrag:")
    print(model.summary())

    y_pred_scaled = model.predict(X_test_const)
    y_pred = y_pred_scaled * y_train_scale + y_train_median
    test_r2 = r2_score(y_test, y_pred)
    return model, test_r2

def tune_lasso(X_train, y_train_scaled, X_val, y_val_scaled, alphas):
    best_alpha = None
    best_score = np.inf
    best_model = None
    for a in alphas:
        model = Lasso(alpha=a, max_iter=20000)
        model.fit(X_train, y_train_scaled)
        y_pred_val = model.predict(X_val)
        mse = np.mean((y_val_scaled - y_pred_val)**2)
        if mse < best_score:
            best_score = mse
            best_alpha = a
            best_model = model
    print("\nLasso best alpha:", best_alpha)
    print("Lasso koeffisienter:")
    print(pd.Series(best_model.coef_, index=X_train.columns))
    return best_model, best_alpha, best_score

def tune_ridge(X_train, y_train_scaled, X_val, y_val_scaled, alphas):
    best_alpha = None
    best_score = np.inf
    best_model = None
    for a in alphas:
        model = Ridge(alpha=a, max_iter=20000)
        model.fit(X_train, y_train_scaled)
        y_pred_val = model.predict(X_val)
        mse = np.mean((y_val_scaled - y_pred_val)**2)
        if mse < best_score:
            best_score = mse
            best_alpha = a
            best_model = model
    print("\nRidge best alpha:", best_alpha)
    print("Ridge koeffisienter:")
    print(pd.Series(best_model.coef_, index=X_train.columns))
    return best_model, best_alpha, best_score

def tune_elasticnet(X_train, y_train_scaled, X_val, y_val_scaled, alphas, l1_ratios):
    best_alpha = None
    best_l1_ratio = None
    best_score = np.inf
    best_model = None
    for a in alphas:
        for l1 in l1_ratios:
            model = ElasticNet(alpha=a, l1_ratio=l1, max_iter=20000)
            model.fit(X_train, y_train_scaled)
            y_pred_val = model.predict(X_val)
            mse = np.mean((y_val_scaled - y_pred_val)**2)
            if mse < best_score:
                best_score = mse
                best_alpha = a
                best_l1_ratio = l1
                best_model = model
    print("\nElasticNet best alpha:", best_alpha, "best l1_ratio:", best_l1_ratio)
    print("ElasticNet koeffisienter:")
    print(pd.Series(best_model.coef_, index=X_train.columns))
    return best_model, best_alpha, best_l1_ratio, best_score

def bottom_up_portfolio_evaluation(df, model, scaler_sel, selected_features, y_train_median, y_train_scale, n_deciles=10):
    """
    Konstruerer en bottom-up portefølje ved å:
      1. Predikere individuell overskuddsavkastning med modellen basert på de utvalgte faktorene.
      2. Gruppes per måned (YearMonth) og sorterer aksjene i deciler basert på predikerte verdier.
      3. Beregner long-short avkastning (forskjellen mellom topp decile og bunn decile) for hver måned.
      4. Returnerer lister over perioder (YearMonth) og long-short avkastning.
    """
    X_selected = scaler_sel.transform(df[selected_features])
    X_const = sm.add_constant(X_selected, has_constant='add')
    y_pred_scaled = model.predict(X_const)
    y_pred = y_pred_scaled * y_train_scale + y_train_median
    df = df.copy()
    df['Predicted_Excess'] = y_pred

    # Opprett en YearMonth-kolonne
    df['YearMonth'] = df['Date'].dt.to_period('M')

    portfolio_returns = []
    months = []
    for period, group in df.groupby('YearMonth'):
        if len(group) < n_deciles:
            continue
        group = group.copy()
        group['Decile'] = pd.qcut(group['Predicted_Excess'], n_deciles, labels=False) + 1
        decile_top = group[group['Decile'] == n_deciles]['Excess_Return'].mean()
        decile_bottom = group[group['Decile'] == 1]['Excess_Return'].mean()
        ls_return = decile_top - decile_bottom
        portfolio_returns.append(ls_return)
        months.append(period)

    return months, portfolio_returns

def print_monthly_returns(df):
    """
    Skriver ut en tabell med månedlig gjennomsnittlig overskuddsavkastning, standardavvik og antall observasjoner.
    """
    df['YearMonth'] = df['Date'].dt.to_period('M')
    monthly_avg = df.groupby('YearMonth')['Excess_Return'].mean()
    monthly_std = df.groupby('YearMonth')['Excess_Return'].std()
    monthly_count = df.groupby('YearMonth')['Excess_Return'].count()
    monthly_summary = pd.DataFrame({
        'Avg_Excess_Return': monthly_avg,
        'Std_Excess_Return': monthly_std,
        'Count': monthly_count
    })
    print("\nMånedlig overskuddsavkastning:")
    print(monthly_summary.to_string())
    return monthly_summary

def investigate_extreme_month(df, model, scaler_sel, selected_features, y_train_median, y_train_scale, target_month="2016-10"):
    """
    Filtrerer datasettet for en gitt måned (f.eks. "2016-10"),
    beregner predikerte overskuddsavkastninger for den måneden,
    grupperer aksjene i deciler basert på den predikerte overskuddsavkastningen,
    og skriver ut informasjon for aksjene i topp og bunn decile.
    """
    # Filtrer for target_month
    df_target = df[df["Date"].dt.to_period("M") == target_month].copy()
    if df_target.empty:
        print("Ingen data funnet for", target_month)
        return None

    # Beregn predikerte overskuddsavkastninger for den filtrerte måneden
    X_target = scaler_sel.transform(df_target[selected_features])
    X_target_const = sm.add_constant(X_target, has_constant='add')
    y_pred_scaled = model.predict(X_target_const)
    y_pred = y_pred_scaled * y_train_scale + y_train_median
    df_target["Predicted_Excess"] = y_pred

    # Del inn i deciler basert på predikerte overskuddsavkastninger
    df_target["Decile"] = pd.qcut(df_target["Predicted_Excess"], 10, labels=False) + 1

    print("Antall observasjoner per decile for", target_month)
    print(df_target.groupby("Decile").size())

    # Ekstraher topp og bunn decile
    top_decile = df_target[df_target["Decile"] == 10]
    bottom_decile = df_target[df_target["Decile"] == 1]

    print("\nTop decile for", target_month)
    print(top_decile[["RIC", "MonthlyReturn", "Excess_Return", "Predicted_Excess"]].sort_values("Excess_Return", ascending=False))

    print("\nBottom decile for", target_month)
    print(bottom_decile[["RIC", "MonthlyReturn", "Excess_Return", "Predicted_Excess"]].sort_values("Excess_Return"))

    print("\nGjennomsnittlig Excess_Return:")
    print("Top decile:", top_decile["Excess_Return"].mean())
    print("Bottom decile:", bottom_decile["Excess_Return"].mean())

    return df_target

def main():
    file_path = "OSEFX_Market_Macro_Data_CrossSectional_Imputed.csv"
    df = load_and_clean_data(file_path)

    # Splitter datasettet
    (X_train, y_train, y_train_scaled,
     X_val, y_val, y_val_scaled,
     X_test, y_test, y_test_scaled,
     scaler_X, y_train_median, y_train_scale) = split_stock_level_data_3way(df)

    selected_features = ["Size", "BM", "Mom12m"]

    # Baseline OLS-modell
    ols_model, ols_test_r2 = run_ols(X_train[selected_features], y_train_scaled,
                                     X_test[selected_features], y_test,
                                     y_train_median, y_train_scale)
    print("\nOLS (3 faktorer) Test R²:", ols_test_r2)

    # Penaliserte metoder
    alphas = np.logspace(-4, 0, 50)
    l1_ratios = [0.1, 0.5, 0.7, 0.9, 1.0]

    lasso_model, best_alpha_lasso, _ = tune_lasso(X_train[selected_features], y_train_scaled,
                                                    X_val[selected_features], y_val_scaled, alphas)
    ridge_model, best_alpha_ridge, _ = tune_ridge(X_train[selected_features], y_train_scaled,
                                                  X_val[selected_features], y_val_scaled, alphas)
    enet_model, best_alpha_enet, best_l1_ratio, _ = tune_elasticnet(X_train[selected_features], y_train_scaled,
                                                                   X_val[selected_features], y_val_scaled, alphas, l1_ratios)

    # Test R² for penaliserte metoder (på skalert y)
    lasso_r2 = r2_score(y_test_scaled, lasso_model.predict(X_test[selected_features]))
    ridge_r2 = r2_score(y_test_scaled, ridge_model.predict(X_test[selected_features]))
    enet_r2 = r2_score(y_test_scaled, enet_model.predict(X_test[selected_features]))

    results = pd.DataFrame({
        "Modell": ["OLS", "Lasso", "Ridge", "ElasticNet"],
        "Test R²": [ols_test_r2, lasso_r2, ridge_r2, enet_r2]
    })
    print("\nSammenligning av modellresultater:")
    print(results.to_string(index=False))

    # Porteføljekonstruksjon
    scaler_sel = StandardScaler().fit(df[selected_features])
    months, port_returns = bottom_up_portfolio_evaluation(df, ols_model, scaler_sel,
                                                          selected_features,
                                                          y_train_median,
                                                          y_train_scale,
                                                          n_deciles=10)

    df_portfolio = pd.DataFrame({
        "YearMonth": months,
        "LongShort_Return": port_returns
    }).sort_values("YearMonth")

    print("\n--- Bottom-up Portefølje (Long-Short) tallverdier ---")
    print(df_portfolio.to_string(index=False))

    # Se spesifikt på alle måneder i 2016
    df_2016 = df_portfolio[df_portfolio["YearMonth"].dt.year == 2016]
    print("\n--- Månedlige tall for 2016 ---")
    print(df_2016.to_string(index=False))

    # Print månedlig overskuddsavkastning for alle måneder
    print_monthly_returns(df)

    # Undersøk den ekstreme måneden, f.eks. oktober 2016
    print("\n--- Undersøkelse av den ekstreme måneden (2016-10) ---")
    investigate_extreme_month(df, ols_model, scaler_sel, selected_features, y_train_median, y_train_scale, target_month="2016-10")

    # Plotting av porteføljeavkastning over tid
    df_portfolio["YearMonth"] = df_portfolio["YearMonth"].dt.to_timestamp()
    plt.figure(figsize=(14,6))
    plt.plot(df_portfolio["YearMonth"], df_portfolio["LongShort_Return"], marker="o", label="Long-Short Return")
    plt.xlabel("Dato")
    plt.ylabel("Månedlig Long-Short Avkastning")
    plt.title("Bottom-up Portefølje (Long-Short: Topp decile minus bunn decile)")
    plt.legend()
    plt.grid(True)
    plt.show()

if __name__ == "__main__":
    main()
