In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn import linear_model
import pyarrow as pa
import pyarrow.dataset as ds
import shutil

In [None]:
out_dir = Path("data/parquet/features")
CLEAN_PARQUET = False  # set True only if you want to reset the dataset
if CLEAN_PARQUET and out_dir.exists():
    shutil.rmtree(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)


def compute_returns(df):
    return_list = []
    for i in range(len(df)):
        returns = df.iloc[i]/df.iloc[i-1]
        return_list.append(returns)
    return return_list



prices = Path("data/parquet/prices")
def momentum(df, batch = str):
    batch_params = {
        "20 jours" : 20,
        "3 mois" : 60,
        "1 an" : 252,
        "all" : "all"
    }
    if df is None or df.empty:
        return 0
    if batch not in batch_params :
        KeyError ; "Please choose between the 4 choices : 20 jours, 3 mois, 1 an, all"


    for i in prices:
        returns = compute_returns(i)
        for r in range(len(returns)):
            mom_20 = returns.iloc[r:r+20].mean()
            mom_60 = returns.iloc[r:r+60].mean()
            mom_252 = returns.mean()

    return mom_20, mom_60, mom_252



def trend_slope_60(df):
    log_list = []
    if df is None or df.empty:
        return 0
    for i in range(len(df)):
        log_prices = np.log(df['adj_close'].iloc[i])
        log_list.append(log_prices)
        lr = linear_model.LinearRegression(log_list)
    return lr


def upsert_features(df: pd.DataFrame) -> int:
    """Append the new batch to a Hive-partitioned Parquet dataset.

    Dataset layout: data/parquet/features/feature=ZZZ/ticker=XXX/year=YYYY/*.parquet
    """
    if df is None or df.empty:
        return 0

    # Ensure required partition columns exist
    if "year" not in df.columns:
        dt = pd.to_datetime(df["date"], errors="coerce")
        ok = dt.notna()
        df = df.loc[ok].copy()
        df["year"] = dt.loc[ok].dt.year.astype("int32")

    table = pa.Table.from_pandas(df, preserve_index=False)

    partitioning = ds.partitioning(
        pa.schema(
            [
                ("feature", pa.string()),
                ("ticker", pa.string()),
                ("year", pa.int32()),
            ]
        ),
        flavor="hive",
    )

    # Unique filenames per batch to avoid overwriting older fragments
    ticker = str(df["ticker"].iloc[0])
    feature_name = str(df[f"{feature}"].iloc[0])
    dmin = str(df["date"].min())
    dmax = str(df["date"].max())
    basename_template = f"{ticker}_{dmin}_{dmax}_{feature_name}_{{i}}.parquet"

    ds.write_dataset(
        table,
        base_dir=str(out_dir),
        format="parquet",
        partitioning=partitioning,
        basename_template=basename_template,
        existing_data_behavior="overwrite_or_ignore",
    )

    return table.num_rows
