In [0]:
%run /Workspace/Users/jorgegarciaotero@gmail.com/tfm_databricks/config/database_connector

In [0]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F


#### FUNCIONES

In [0]:
def check_null_values(df) -> None:
    """
    Checks for null values in the DataFrame and displays the count of null values for each column.

    ARGS:
        df: Spark DataFrame

    RETURNS:
        None
    """
    excluded_cols = ['date', 'symbol']
    cols_to_check = [c for c in df.columns if c not in excluded_cols]

    # Create list of (column_name, count expression)
    null_exprs = [
        (c, F.count(F.when(F.col(c).isNull() | F.isnan(c), c)).alias(c))
        for c in cols_to_check
    ]

    # Select row with all null counts
    null_row = df.select([expr for _, expr in null_exprs])

    # Convert to long format using stack
    stacked = null_row.select(F.expr("stack({}, {})".format(
        len(null_exprs),
        ", ".join([f"'{c}', {c}" for c, _ in null_exprs])
    )).alias("column", "nulls"))

    # Filter and sort
    result = stacked.filter("nulls > 0").orderBy(F.desc("nulls"))

    display(result)


In [0]:
def remove_initial_days_per_symbol(df, min_days=20):
    """
    Removes the first `min_days` rows per symbol based on date order.
    
    Args:
        df (DataFrame): Spark DataFrame with at least ['symbol', 'date']
        min_days (int): Number of initial rows to drop per symbol

    Returns:
        DataFrame: Cleaned DataFrame with initial rows removed
    """
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number

    w = Window.partitionBy("symbol").orderBy("date")
    df = df.withColumn("row_num", row_number().over(w))
    df = df.filter(F.col("row_num") > min_days).drop("row_num")
    return df


#### MAIN

In [0]:
# Creates the input widgets and sets the default values
dbutils.widgets.text("storage_account", "smartwalletjorge", "Storage Account")
dbutils.widgets.text("container", "smart-wallet-dl", "Container")
dbutils.widgets.text("database", "smart_wallet", "Database")

storage_account = dbutils.widgets.get("storage_account")
container = dbutils.widgets.get("container")
database_name = dbutils.widgets.get("database")
date_value = dbutils.widgets.get("date")
if (date_value is None) or (date_value==''):
    date_value=None

db_connector = DatabaseConnector()
print(f"database_name :{database_name}")

df=db_connector.read_table_from_path(container, database_name, "stock_data_parquet", date_value,"parquet")
    

##### 1. Esquema y nulos

In [0]:
print(f"Counts : {df.count()}")
df.printSchema()


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, avg, stddev, when, log, avg,abs, lit, last, isnan,lead
from pyspark.sql.functions import greatest, least,  row_number, max as spark_max, expr
from datetime import datetime, timedelta
import sys

In [0]:
def add_future_returns(df):
    """
    Add future return columns for 3-month, 6-month, and 1-year horizons.

    For each symbol and date, computes:
      - price_lead_3m: closing price ~63 rows ahead (≈3 months of trading days)
      - price_lead_6m: closing price ~126 rows ahead (≈6 months of trading days)
      - price_lead_1y: closing price ~252 rows ahead (≈1 year of trading days)

      - ret_next_3m: (price_lead_3m / close_v) - 1
      - ret_next_6m: (price_lead_6m / close_v) - 1
      - ret_next_1y: (price_lead_1y / close_v) - 1

    Assumes:
      - The DataFrame `df` contains at least ['symbol', 'date', 'close_v'].
      - `df` is already ordered by ('symbol','date').

    Returns:
      DataFrame: same DataFrame with new columns:
        price_lead_3m, price_lead_6m, price_lead_1y,
        ret_next_3m, ret_next_6m, ret_next_1y.
    """
    # Number of trading-day rows ahead for each horizon
    days_3m = 21 * 3    # ~63 trading days
    days_6m = 21 * 6    # ~126 trading days
    days_1y = 21 * 12   # ~252 trading days

    # Define window partitioned by symbol, ordered by date
    w = Window.partitionBy("symbol").orderBy("date")

    # 1) Closing price ~3 months ahead
    df = df.withColumn("price_lead_3m", lead("close_v", days_3m).over(w))

    # 2) Closing price ~6 months ahead
    df = df.withColumn("price_lead_6m", lead("close_v", days_6m).over(w))

    # 3) Closing price ~1 year ahead
    df = df.withColumn("price_lead_1y", lead("close_v", days_1y).over(w))

    # 4) Future return at 3 months
    df = df.withColumn(
        "ret_next_3m",
        when(
            col("price_lead_3m").isNotNull() & (col("close_v") > 0),
            col("price_lead_3m") / col("close_v") - 1
        ).otherwise(lit(None))
    )

    # 5) Future return at 6 months
    df = df.withColumn(
        "ret_next_6m",
        when(
            col("price_lead_6m").isNotNull() & (col("close_v") > 0),
            col("price_lead_6m") / col("close_v") - 1
        ).otherwise(lit(None))
    )

    # 6) Future return at 1 year
    df = df.withColumn(
        "ret_next_1y",
        when(
            col("price_lead_1y").isNotNull() & (col("close_v") > 0),
            col("price_lead_1y") / col("close_v") - 1
        ).otherwise(lit(None))
    )

    return df

In [0]:
old_cols = [
        "ret_past_1d", "ret_past_7d", "ret_past_1m", "ret_past_3m",
        "ret_past_6m", "ret_past_1y", "ret_past_5y", "ret_total",
        "target_3m", "target_6m", "target_1y"
    ]

In [0]:
df_clean = df.drop(*[c for c in old_cols if c in df.columns])


In [0]:
df_clean.printSchema()

In [0]:
df_updated = add_future_returns(df_clean)


In [0]:
df_updated.printSchema()

In [0]:
check_null_values(df_updated)


In [0]:
df_updated.coalesce(1).write.mode("overwrite").format("parquet").save("abfss://smart-wallet-dl@smartwalletjorge.dfs.core.windows.net/smart_wallet/stock_data_parquet")

In [0]:
# Check the null values of the dataframe
check_null_values(df)

capital_gains son valores de ganancia de capitales, que raras veces se rellenan. Se eliminará, posiblemente, en la parte de EDA en los modelos.

Al quitar los valores de los primeros 20 dias de cada acción, vemos eliminamos los valores vacíos de features como gap_open, delta, prev_close. Son vlaores que no existen para los primeros días calculados.

In [0]:
df_cleaned=remove_initial_days_per_symbol(df, min_days=20)
check_null_values(df_cleaned) 

In [0]:
df_aux=df_cleaned.filter(F.col("momentum_10").isNull())
display(df_aux)
df_cleaned = df_cleaned.filter(F.col("momentum_10").isNotNull() & ~F.isnan("momentum_10"))
check_null_values(df_cleaned) 

Existen 17 valores de log_return  Null. En todos los casos ocurre que o bien el valor de close_v es negativo  o prev_close es 0. Son balores corruptos que se deben eliminar en la extracción de los datos.

In [0]:

df_aux=df_cleaned.filter(F.col("log_return").isNull())
display(df_aux.select("date","symbol","log_return","close_v","prev_close","volume"))

In [0]:
df_cleaned.filter(
    (F.col("symbol") == "LENZ") & 
    (F.col("date").isin(["2024-03-21", "2024-03-22","2024-03-14","2024-03-15"]))
).orderBy("date").select("date","symbol","log_return","close_v","prev_close","volume").display()


Volvemos a calcular los valores nulos. 

In [0]:
check_null_values(df_cleaned) 

In [0]:
df_cleaned.printSchema()

In [0]:
df_apple=df_cleaned.filter(
    (F.col("symbol") == "AAPL") #& 
    #(F.col("date").isin(["2025-05-11", "2025-05-12","2025-05-13", "2025-05-14","2025-05-15", "2025-05-16","2025-05-17","2025-05-18"]))
).orderBy("date").select("date","close_v","prev_close","open_v",
                         "ret_past_1d","ret_past_7d","ret_past_1m","ret_past_3m","ret_past_6m","ret_past_1y","ret_past_5y","ret_total").display()
