# Import Packages

In [0]:
import sys

sys.path.append("/Workspace/Shared/lib/")
import os
import logging
import time
import requests
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from params import get_env, get_catalog, get_schema, get_table, get_url, get_volume

# Logging

In [0]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(name)s [%(levelname)s] %(message)s",
    stream=sys.stdout,
    force=True,
)

# Params

In [0]:
env, catalog_suffix = get_env()
catalog = get_catalog()
schema = get_schema()
table = get_table()
url = get_url()
volume = get_volume()

print(f"env = {env}")
print(f"catalog_suffix = {catalog_suffix}")
print(f"catalog = {catalog}")
print(f"schema = {schema}")
print(f"table = {table}")
print(f"url = {url}")
print(f"volume = {volume}")

In [0]:
catalog = "featlib_dev"
schema = "components"

# Split Adjustment Factor

In [0]:
spark.sql(
    f"""
    CREATE OR REPLACE TABLE {catalog}.{schema}.split_adj AS
        -- Get the unique symbols from the split table
        WITH base_table AS (
            SELECT DISTINCT act_symbol FROM raw{catalog_suffix}.stocks.split
        ),
        -- Create a new table with a row for each symbol and a NULL split for date 2000-01-01
        new_split_table AS (
            SELECT 
                act_symbol,
                DATE '2000-01-01' as ex_date,
                NULL as to_factor,
                NULL as for_factor
            FROM base_table
            UNION ALL
            SELECT * FROM raw{catalog_suffix}.stocks.split
        ),
        -- Calculate the adjustment factor
        split_factors AS (
            -- Calculate adjustment factors and order the rows
            SELECT
                act_symbol,
                ex_date AS adj_date,
                try_divide(double(for_factor), double(to_factor)) AS adj_factor
            FROM new_split_table
        ),
        -- Order the factors and assign end dates
        ordered_factors AS (
            -- Assign end dates and ensure chronological order
            SELECT
                act_symbol,
                adj_date,
                CAST (LEAD(adj_date) OVER (PARTITION BY act_symbol ORDER BY adj_date ASC) - INTERVAL '1 DAY' AS DATE) AS end_adj_date,
                adj_factor,
                ROW_NUMBER() OVER (PARTITION BY act_symbol ORDER BY adj_date DESC) AS reverse_row_num
            FROM split_factors
        ),
        -- Calculate the cumulative adjustment factor
        cumulative_factors AS (
            -- Compute cumulative adjustment factors in reverse order
            SELECT
                act_symbol,
                adj_date,
                end_adj_date,
                adj_factor,
                -- Reverse chronological cumulative multiplication
                CASE 
                    WHEN end_adj_date IS NULL 
                        THEN 1
                    WHEN lead(end_adj_date) OVER (PARTITION BY act_symbol ORDER BY adj_date ASC) IS NULL
                        THEN lead(adj_factor) OVER (PARTITION BY act_symbol ORDER BY adj_date ASC)
                    --ELSE product(adj_factor) OVER (PARTITION BY act_symbol ORDER BY reverse_row_num ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING)
                    ELSE EXP(SUM(LN(adj_factor)) OVER (PARTITION BY act_symbol ORDER BY reverse_row_num ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING))
                END AS cum_adj_factor
            FROM ordered_factors
        )
        -- Final output
        SELECT
            act_symbol,
            adj_date,
            end_adj_date,
            adj_factor,
            cum_adj_factor
        FROM cumulative_factors
        ORDER BY adj_date ASC;
    """
)

# Pricing Inc. Splits

In [0]:
spark.sql(
    f"""
    -- 17,457
    CREATE OR REPLACE TEMPORARY VIEW tmp_pricing01_inc_splits AS (
        with ohlcv_split as (
            select
                to_date(a.date) as date,
                a.act_symbol,
                a.open * COALESCE(f.cum_adj_factor, 1) as open,
                a.high * COALESCE(f.cum_adj_factor, 1) as high,
                a.low * COALESCE(f.cum_adj_factor, 1) as low,
                a.close * COALESCE(f.cum_adj_factor, 1) as close,
                a.volume / COALESCE(f.cum_adj_factor, 1) as volume
            FROM raw{catalog_suffix}.stocks.ohlcv a
            LEFT JOIN featlib{catalog_suffix}.components.split_adj f 
                ON a.act_symbol=f.act_symbol 
                AND a.date >= f.adj_date 
                AND (a.date <= f.end_adj_date OR f.end_adj_date IS NULL)
        )
        select * from ohlcv_split
    )
    """
)

In [0]:
%sql
select * from tmp_pricing01_inc_splits

In [0]:
%sql
with cte as (select distinct act_symbol from tmp_pricing01_inc_splits) select count(*) from cte

# Remove Symbols With History Gaps


In [0]:
spark.sql(
    f"""
    -- 14,566
    -- Remove symbols with gaps > 14 days
    CREATE OR REPLACE TEMPORARY VIEW tmp_pricing02_remsym_misshist AS (
        WITH day_difference AS (
            SELECT
                *,
                int(LEAD(date) OVER (PARTITION BY act_symbol ORDER BY date) - date) AS day_count
            FROM
                tmp_pricing01_inc_splits
            ),
        filtered_symbols AS (
            SELECT
                act_symbol
            FROM
                day_difference
            WHERE
                day_count > 14
            GROUP BY
                act_symbol
        )
        SELECT
            * except (day_count)
        FROM
            day_difference
        WHERE
            act_symbol NOT IN (SELECT act_symbol FROM filtered_symbols)
        AND open IS NOT NULL 
        AND high IS NOT NULL 
        AND low IS NOT NULL 
        AND close IS NOT NULL 
        AND volume IS NOT NULL
    )
    """
)

In [0]:
%sql
select * from tmp_pricing02_remsym_misshist

In [0]:
%sql
with cte as (select distinct act_symbol from tmp_pricing02_remsym_misshist) select count(*) from cte

# Remove Zero Prices

In [0]:
spark.sql(
    """
    -- Remove symbols which have more than 5 zero values for ohlcv
    -- Massage the data to replace zeros with the avg/min/max of the previous 5 days depending on the column
    CREATE OR REPLACE TEMPORARY VIEW tmp_pricing03_remsym_zeros AS (
        WITH zero_close_count AS (
        SELECT
            act_symbol,
            COUNT(*) AS zero_close_count
        FROM
            tmp_pricing02_remsym_misshist
        WHERE
            close = 0
        GROUP BY
            act_symbol
        HAVING
            zero_close_count > 5
        ),
        zero_low_count AS (
            SELECT
                act_symbol,
                COUNT(*) AS zero_low_count
            FROM
                tmp_pricing02_remsym_misshist
            WHERE
                low = 0
            GROUP BY
                act_symbol
            HAVING
                zero_low_count > 5
        ),
        zero_open_count AS (
            SELECT
                act_symbol,
                COUNT(*) AS zero_open_count
            FROM
                tmp_pricing02_remsym_misshist
            WHERE
                open = 0
            GROUP BY
                act_symbol
            HAVING
                zero_open_count > 5
        ),
        zero_high_count AS (
            SELECT
                act_symbol,
                COUNT(*) AS zero_high_count
            FROM
                tmp_pricing02_remsym_misshist
            WHERE
                high = 0
            GROUP BY
                act_symbol
            HAVING
                zero_high_count > 5
        ),
        zero_volume_count AS (
            SELECT
                act_symbol,
                COUNT(*) AS zero_volume_count
            FROM
                tmp_pricing02_remsym_misshist
            WHERE
                volume = 0
            GROUP BY
                act_symbol
            HAVING
                zero_volume_count > 5
        ),
        filtered_symbols AS (
            SELECT act_symbol FROM zero_close_count
            UNION
            SELECT act_symbol FROM zero_low_count
            UNION
            SELECT act_symbol FROM zero_open_count
            UNION
            SELECT act_symbol FROM zero_high_count
            UNION
            SELECT act_symbol FROM zero_volume_count
        ),
        remove_symbols_and_fix_zeros AS (
            SELECT
                date,
                act_symbol,
                COALESCE(
                    NULLIF(open, 0),
                    AVG(open) OVER (
                        PARTITION BY act_symbol
                        ORDER BY date asc
                        ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING
                    )
                ) AS open,
                COALESCE(
                    NULLIF(high, 0),
                    MAX(high) OVER (
                        PARTITION BY act_symbol
                        ORDER BY date asc
                        ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING
                    )
                ) AS high,
                COALESCE(
                    NULLIF(low, 0),
                    AVG(NULLIF(low, 0)) OVER (
                        PARTITION BY act_symbol
                        ORDER BY date asc
                        ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING
                    )
                ) AS low,
                COALESCE(
                    NULLIF(close, 0),
                    AVG(close) OVER (
                        PARTITION BY act_symbol
                        ORDER BY date asc
                        ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING
                    )
                ) AS close,
                COALESCE(
                    NULLIF(volume, 0),
                    AVG(NULLIF(volume, 0)) OVER (
                        PARTITION BY act_symbol
                        ORDER BY date asc
                        ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING
                    )
                ) AS volume
            FROM
                tmp_pricing02_remsym_misshist
            WHERE
                act_symbol NOT IN (SELECT act_symbol FROM filtered_symbols)
        )
        SELECT
            *
        FROM remove_symbols_and_fix_zeros
        where high >= low
    );
    """
)

In [0]:
%sql
with cte as (select distinct act_symbol from tmp_pricing03_remsym_zeros) select count(*) from cte

# Resample to Business Day Frequency

# Resample to Business Day Frequency

In [0]:
spark.sql(
"""
CREATE OR REPLACE TEMPORARY VIEW tmp_pricing04_resampled AS (
    -- Step 1: Identify the max date for each symbol
    WITH max_dates AS (
        SELECT
            act_symbol,
            MAX(date) AS max_date
        FROM
            tmp_pricing03_remsym_zeros
        GROUP BY
            act_symbol
    ),

    -- Step 2: Generate a business day calendar for the date range in your data
    date_series AS (
        SELECT
            EXPLODE(SEQUENCE(
                MIN(date), -- Start date of your data
                MAX(date), -- End date of your data
                INTERVAL 1 DAY
            )) AS date
        FROM
            tmp_pricing03_remsym_zeros
    ),
    business_days AS (
        SELECT
            date
        FROM
            date_series
        WHERE
            EXTRACT(DOW FROM date) NOT IN (0, 6) -- Exclude Sundays (0) and Saturdays (6)
        ORDER BY
            date
    ),

    -- Step 3: Create a complete business day calendar for each symbol up to its max date
    symbol_calendar AS (
        SELECT
            bd.date,
            md.act_symbol
        FROM
            business_days bd
        CROSS JOIN
            max_dates md
        WHERE
            bd.date <= md.max_date
    ),

    -- Step 4: Join the calendar with your data and forward-fill missing values
    resampled_data AS (
        SELECT
            sc.date,
            sc.act_symbol,
            COALESCE(p.open, LAST(p.open, TRUE) OVER (PARTITION BY sc.act_symbol ORDER BY sc.date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS open,
            COALESCE(p.high, LAST(p.high, TRUE) OVER (PARTITION BY sc.act_symbol ORDER BY sc.date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS high,
            COALESCE(p.low, LAST(p.low, TRUE) OVER (PARTITION BY sc.act_symbol ORDER BY sc.date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS low,
            COALESCE(p.close, LAST(p.close, TRUE) OVER (PARTITION BY sc.act_symbol ORDER BY sc.date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS close,
            COALESCE(p.volume, LAST(p.volume, TRUE) OVER (PARTITION BY sc.act_symbol ORDER BY sc.date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) AS volume
        FROM
            symbol_calendar sc
        LEFT JOIN
            tmp_pricing03_remsym_zeros p
        ON
            sc.date = p.date AND sc.act_symbol = p.act_symbol
    )

    -- Final Output
    SELECT * FROM resampled_data
    WHERE open IS NOT NULL AND high IS NOT NULL AND low IS NOT NULL AND close IS NOT NULL AND volume IS NOT NULL
    -- AND isnan(open) = false AND isnan(high) = false AND isnan(low) = false AND isnan(close) = false AND isnan(volume) = false
    AND open != 'Infinity' AND high != 'Infinity' AND low != 'Infinity' AND close != 'Infinity' AND volume != 'Infinity'
    ORDER BY act_symbol, date
);
"""
)