---
## Create Dynamic Table
`HISTORICAL_QUOTES_TYPED` is a copy of the table created by the Openflow Kafka connector, but with correct types for date and currency columns.

In [None]:
USE ROLE ACCOUNTADMIN;
USE DATABASE NASDAQ_DEMO;

SELECT
    TABLE_NAME,
    TABLE_TYPE,
    TABLE_OWNER
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'PUBLIC'
AND TABLE_NAME LIKE 'HISTORICAL_%'
OR TABLE_NAME LIKE 'EARNINGS_%';

In [None]:
CREATE OR REPLACE DYNAMIC TABLE HISTORICAL_QUOTES_TYPED
    TARGET_LAG = '1 hour'
    WAREHOUSE = 'COMPUTE_WH'
    AS
        SELECT 
            SYMBOL,
            TO_DATE(DATE) AS QUOTE_DATE,
            TO_DOUBLE(LTRIM(CLOSE_LAST, '$')) AS CLOSE_LAST_USD,
            VOLUME,
            TO_DOUBLE(LTRIM(OPEN, '$')) AS OPEN_USD,
            TO_DOUBLE(LTRIM(HIGH, '$')) AS HIGH_USD,
            TO_DOUBLE(LTRIM(LOW, '$')) AS LOW_USD
        FROM HISTORICAL_STOCK_QUOTES;

In [None]:
------------------------------
-- Speed things up a bit
------------------------------
ALTER DYNAMIC TABLE HISTORICAL_QUOTES_TYPED SET TARGET_LAG = '1 minute';
-- ALTER DYNAMIC TABLE HISTORICAL_QUOTES_TYPED SET TARGET_LAG = '1 hour';

In [None]:
SELECT 
    SYMBOL,
    COUNT(QUOTE_DATE) AS num_quotes,
    MAX(VOLUME),
    MAX(CLOSE_LAST_USD),
    MAX(QUOTE_DATE)
FROM HISTORICAL_QUOTES_TYPED
GROUP BY SYMBOL
ORDER BY SYMBOL;

---
## Stock Price Forecast

`SNOWFLAKE.ML.FORECAST` is a Snowflake Cortex machine learning function that enables time series forecasting by training models on historical data to predict future numeric values. The process involves creating a forecasting model using `CREATE SNOWFLAKE.ML.FORECAST` with input data containing a timestamp column (with fixed frequency intervals) and a target numeric column, along with optional exogenous variables that can influence predictions. Once trained, the model becomes a reusable schema-level object that can generate forecasts by calling the `!FORECAST` method and specifying the number of future periods to predict. The function supports both single-series and multi-series forecasting, automatically detects variable types, and provides additional methods for feature importance analysis and evaluation metrics, making it ideal for common use cases like sales forecasting, demand planning, and revenue predictions based on seasonality and historical trends.

In [None]:
CREATE OR REPLACE VIEW HISTORICAL_QUOTES_TIMESERIES AS 
    SELECT 
        SYMBOL,
        QUOTE_DATE,
        CLOSE_LAST_USD        
    FROM HISTORICAL_QUOTES_TYPED
    ORDER BY SYMBOL, QUOTE_DATE;

In [None]:
------------------------------
-- Create forecast model 
------------------------------
CREATE OR REPLACE SNOWFLAKE.ML.FORECAST HISTORICAL_QUOTES_FORECAST_MODEL(
  INPUT_DATA => TABLE(HISTORICAL_QUOTES_TIMESERIES),
  SERIES_COLNAME => 'SYMBOL',
  TIMESTAMP_COLNAME => 'QUOTE_DATE',
  TARGET_COLNAME => 'CLOSE_LAST_USD'
);

SHOW SNOWFLAKE.ML.FORECAST;
-- DROP SNOWFLAKE.ML.FORECAST HISTORICAL_QUOTES_FORECAST_MODEL;

In [None]:
------------------------------
-- Create table with forecasts for next 3 months
-- 5 working days a week, 4 weeks a month, 3 months = 60 periods
------------------------------

CREATE OR REPLACE TABLE HISTORICAL_QUOTES_FORECAST AS
  SELECT * 
  FROM TABLE(HISTORICAL_QUOTES_FORECAST_MODEL!FORECAST(FORECASTING_PERIODS => 60));

In [None]:
SELECT 
    SERIES,
    TS,
    FORECAST,
    LOWER_BOUND,
    UPPER_BOUND
FROM HISTORICAL_QUOTES_FORECAST
WHERE SERIES = 'TSLA' AND TS > CURRENT_TIMESTAMP()
ORDER BY SERIES, TS
LIMIT 10;

---
## Cortex Search for Earnings Reports

In [None]:
-- CREATE OR REPLACE STAGE EARNINGS_REPORTS_STAGE
--     ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
--     DIRECTORY = ( ENABLE = TRUE );

-- GRANT READ ON STAGE EARNINGS_REPORTS_STAGE TO ROLE OPENFLOW_RUNTIME_ROLE;
-- GRANT WRITE ON STAGE EARNINGS_REPORTS_STAGE TO ROLE OPENFLOW_RUNTIME_ROLE;

-- GRANT READ ON STAGE EARNINGS_REPORTS_STAGE TO ROLE ACCOUNTADMIN;
-- GRANT WRITE ON STAGE EARNINGS_REPORTS_STAGE TO ROLE ACCOUNTADMIN;

ALTER STAGE EARNINGS_REPORTS_STAGE REFRESH;

SELECT 
    COUNT(*) as total_documents,
    COUNT(DISTINCT RELATIVE_PATH) as unique_files,
    ROUND(AVG(SIZE)/1024/1024, 2) as avg_size_mb
FROM DIRECTORY(@EARNINGS_REPORTS_STAGE);

-- REMOVE @EARNINGS_REPORTS_STAGE;

In [None]:
----------------------------------------
-- Create table to hold the extracted text from the PDF files
--
-- See AISQL AI_PARSE_DOCUMENT
--   https://docs.snowflake.com/en/user-guide/snowflake-cortex/parse-document
----------------------------------------

-- CREATE OR REPLACE TABLE EARNINGS_REPORTS_PARSED (
--   relative_path VARCHAR(), 
--   markdown VARIANT
-- );

-- INSERT INTO "EARNINGS_REPORTS_PARSED" (relative_path, markdown)
-- WITH staged_reports AS (
--     SELECT
--         relative_path
--     FROM DIRECTORY(@EARNINGS_REPORTS_STAGE)
-- )
-- SELECT
--     relative_path,
--     AI_PARSE_DOCUMENT (
--         TO_FILE('@EARNINGS_REPORTS_STAGE', relative_path),
--         {'mode': 'LAYOUT'}
--     ) AS markdown
-- FROM staged_reports;

SELECT
    relative_path,
    markdown
FROM "EARNINGS_REPORTS_PARSED"
WHERE relative_path LIKE 'TSLA_%'
ORDER BY relative_path
LIMIT 10;

In [None]:
----------------------------------------
-- Chunk the text based on paragraph separators
--
-- See SPLIT_TEXT_RECURSIVE_CHARACTER
--   https://docs.snowflake.com/en/sql-reference/functions/split_text_recursive_character-snowflake-cortex
----------------------------------------

-- CREATE OR REPLACE TABLE EARNINGS_REPORTS_CHUNKS (
--   relative_path VARCHAR(), 
--   chunk STRING
-- );

-- INSERT INTO "EARNINGS_REPORTS_CHUNKS" (relative_path, chunk)
-- WITH report_chunks AS (
--     SELECT
--         relative_path,
--         SNOWFLAKE.CORTEX.SPLIT_TEXT_RECURSIVE_CHARACTER(
--             markdown:content::STRING, -- Extract the 'content' field from the JSON
--             'markdown',               -- Format type
--             2000,                     -- Chunk size (in tokens)
--             100,                      -- Overlap size
--             ['\n\n']                  -- Paragraph separators
--         ) AS chunks
--     FROM "EARNINGS_REPORTS_PARSED"
-- )
-- SELECT
--     relative_path,
--     c.value AS chunk -- Extract each chunk of the parsed text
-- FROM report_chunks,
-- LATERAL FLATTEN(INPUT => chunks) c;

SELECT
    relative_path,
    chunk
FROM "EARNINGS_REPORTS_CHUNKS"
WHERE relative_path LIKE 'TSLA_%'
ORDER BY relative_path
LIMIT 10;

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE EARNINGS_REPORTS_SEARCH
ON chunk
WAREHOUSE = 'COMPUTE_WH'
TARGET_LAG = '5 minutes'
AS (
    SELECT
        relative_path,
        chunk 
    FROM EARNINGS_REPORTS_CHUNKS
);

-- ALTER CORTEX SEARCH SERVICE EARNINGS_REPORTS_SEARCH 
-- SET TARGET_LAG = '15 minutes';

In [None]:
-- SHOW CORTEX SEARCH SERVICES LIKE 'EARNINGS_REPORTS_SEARCH';

SELECT *
FROM TABLE(
    CORTEX_SEARCH_DATA_SCAN(
        SERVICE_NAME => 'EARNINGS_REPORTS_SEARCH'
    )
)
WHERE relative_path LIKE 'TSLA_%'
LIMIT 10;

---
## Cleanup After Yourself!

In [None]:
----------------------------------------
-- Don't forget to drop the dynamic table to avoid wasting dollars
----------------------------------------
-- DROP DYNAMIC TABLE HISTORICAL_QUOTES_TYPED;
-- DROP VIEW HISTORICAL_QUOTES_TIMESERIES;

-- DROP SNOWFLAKE.ML.FORECAST HISTORICAL_QUOTES_FORECAST_MODEL
-- DROP TABLE HISTORICAL_QUOTES_FORECAST;

In [None]:
----------------------------------------
-- Truncate the Kafka table
----------------------------------------
-- TRUNCATE TABLE IF EXISTS "HISTORICAL-STOCK-QUOTES";

In [None]:
----------------------------------------
-- Drop earnings reports staging table
----------------------------------------
-- DROP STAGE EARNINGS_REPORTS_STAGE;
-- DROP TABLE EARNINGS_REPORTS_PARSED;
-- DROP TABLE EARNINGS_REPORTS_CHUNKS;
-- DROP CORTEX SEARCH SERVICE EARNINGS_REPORTS_SEARCH;