In [1]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import logging
import json
import os
import hopsworks


Initializing spark session...
Spark session available as `spark`.


In [2]:
def backfill_nulls(df, order_by_column_name):
    windowSpec = Window.orderBy(order_by_column_name).rowsBetween(Window.unboundedPreceding, 0)
    for column_name in df.columns:
        backfilled_column = F.last(df[column_name], ignorenulls=True).over(windowSpec)
        df = df.withColumn(column_name, F.when(F.col(column_name).isNotNull(), F.col(column_name)).otherwise(backfilled_column))
    
    return df

def fill_initial_nulls(df, order_by_column_name):
    windowSpec = Window.orderBy(order_by_column_name).rowsBetween(0, Window.unboundedFollowing)
    for column_name in df.columns:
        nearest_value_column = F.first(df[column_name], ignorenulls=True).over(windowSpec)
        df = df.withColumn(column_name, F.when(F.col(column_name).isNotNull(), F.col(column_name)).otherwise(nearest_value_column))
    
    return df

def process_news_for_ticker(df_news):    
    df_news = df_news.withColumn('published_utc', F.date_trunc('day', F.col('published_utc')))
    df_news = df_news.withColumn("combined_text", 
                    F.concat_ws(" ", 
                                df_news.description, 
                                F.concat_ws(" ", df_news.keywords), 
                                df_news.title))
    
    df_news = (
        df_news.groupBy("published_utc")
        .agg(F.concat_ws(" ", F.collect_list("combined_text")).alias("aggregated_text"))
    )

    return df_news.withColumn("aggregated_text", F.regexp_replace(df_news.aggregated_text, "\n", " "))

def process_data_for_ticker(df_news, df):
    df = backfill_nulls(df, 't')
    df = fill_initial_nulls(df, 't')
    df = df.withColumn("t", F.date_format(F.from_unixtime(F.col("t") / 1000), "yyyy-MM-dd HH:mm:ss")) \
            .withColumn("t", F.to_timestamp(F.col("t"), "yyyy-MM-dd HH:mm:ss")) \
            .withColumn('t', F.date_trunc('day', F.col('t')))
    
    result_df = (df.join(df_news, df.t == df_news.published_utc, how="left")
                    .drop("published_utc"))

    return result_df.withColumn("aggregated_text", F.coalesce(result_df.aggregated_text, F.lit(" ")))

def process_ticker(df_news, df_price):
    logging.info(f"Starting processing")

    # Process news for the ticker
    df_news = process_news_for_ticker(df_news)

    # Process data for the ticker
    DF_target = process_data_for_ticker(df_news, df_price)

    # Creating target for the ticker
    window_spec = Window.orderBy('t')
    DF_target = DF_target \
        .withColumn('target', F.col('c') - F.col('o')) \
        .withColumn('target', F.lead('target').over(window_spec))
    DF_target = DF_target \
        .withColumn('next_c', F.lead('c').over(window_spec)) \
        .withColumn('target_class', F.when(F.col('target') > 0, 1).otherwise(0)) \
        .withColumn('c_class', F.when(F.col('c') > F.col('o'), 1).otherwise(0))

    logging.info(f"Creating lag features")

    lookback = 5
    cols_to_lag = ['vw', 'n']
    for col in cols_to_lag:
        for lag in range(1, lookback+1):
            DF_target = DF_target.withColumn(f'{col}_lag_{lag}', F.lag(DF_target[col], lag).over(window_spec))

    logging.info(f"Creating rolling features")

    rolling_spec = Window.orderBy('t').rowsBetween(-(lookback), -1)
    for col in cols_to_lag:
        DF_target = DF_target.withColumn(f'{col}_rolling_mean', F.avg(DF_target[col]).over(rolling_spec)) \
            .withColumn(f'{col}_rolling_std', F.stddev(DF_target[col]).over(rolling_spec))

    DF_target = DF_target.dropna()

    logging.info(f"Final processing for")

#     cols = [col for col in DF_target.columns if col not in ['target']]
#     cols = cols + ['target']
    cols = [col for col in DF_target.columns]

    DF_target = DF_target.select(*cols)

    return DF_target

In [3]:
proj = hopsworks.login()
fs = proj.get_feature_store()


Logged in to project, explore it here https://hopsworks0.logicalclocks.com/p/119


In [4]:
logging.info("Starting Silver Layer")

stock_fg = fs.get_or_create_feature_group(name="stock_price", 
                                    version=1)
news_fg = fs.get_or_create_feature_group(name="stock_news", 
                                    version=1)

df = process_ticker(news_fg.read(), stock_fg.read())
    
logging.info("All tickers processed and Silver Layer completed.")
df.show()

2024-09-18 15:10:31,432 INFO: Starting Silver Layer
2024-09-18 15:10:36,083 INFO: Starting processing
2024-09-18 15:10:37,866 INFO: Creating lag features
2024-09-18 15:10:38,084 INFO: Creating rolling features
2024-09-18 15:10:38,302 INFO: Final processing for
2024-09-18 15:10:38,404 INFO: All tickers processed and Silver Layer completed.
+------+-------------------+-------+--------+--------+------+------------+--------+-------+---------------+-----------+------+------------+-------+--------+--------+--------+--------+--------+-------+-------+-------+-------+-------+------------------+------------------+--------------+------------------+
|ticker|                  t|      o|       h|       l|     c|           v|      vw|      n|aggregated_text|     target|next_c|target_class|c_class|vw_lag_1|vw_lag_2|vw_lag_3|vw_lag_4|vw_lag_5|n_lag_1|n_lag_2|n_lag_3|n_lag_4|n_lag_5|   vw_rolling_mean|    vw_rolling_std|n_rolling_mean|     n_rolling_std|
+------+-------------------+-------+--------+----

In [5]:


fg = fs.get_or_create_feature_group(name="stock_features", 
                                    version=1,
                                    primary_key=['ticker'],
                                    event_time="t")
fg.insert(df)

(None, None)