## Optional Step:
### If you wish to see the embedded slide images in the various cells, we need to create a stage and upload the images to that stage. 
### You can skip this step and the associated image cells.  
## Create a NOTEBOOK_IMAGES_STG to store images to display in your notebook.
```
CREATE STAGE NOTEBOOK_IMAGES_STG
	DIRECTORY = ( ENABLE = true ) 
	ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE' );
```
### Copy the following image files into the stage:
- Snowflake_DT_Usage_Stats.png
- Snowflake_DT_For_DE.png
- Snowflake_DT_Challenges_Benefits.png
- The image zip file (Snowflake_Dynamic_Table_Images.zip) can be [found here](https://github.com/rrprasan/Finance/tree/main/Snowflake/Notebooks/Technical_Indicators/VWAP_Using_Dynamic_Tables).  

In [None]:
CREATE STAGE NOTEBOOK_IMAGES_STG
	DIRECTORY = ( ENABLE = true ) 
	ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE' );

## :truck: Import Python Packages and Get Active Session to Snowflake :snowflake:

In [None]:
# Snowpark Pandas API
import modin.pandas as spd
# Import the Snowpark pandas plugin for modin
import streamlit as st
import matplotlib.pyplot as plt
import snowflake.snowpark.modin.plugin

from snowflake.snowpark.context import get_active_session
# Create a snowpark session
session = get_active_session()

In [None]:
image = session.file.get_stream("@NOTEBOOK_IMAGES_STG/Snowflake_DT_Usage_Stats.png" , decompress=False).read() 
# Display the image
st.image(image)

In [None]:
image = session.file.get_stream("@NOTEBOOK_IMAGES_STG/Snowflake_DT_For_DE.png" , decompress=False).read() 
# Display the image
st.image(image)

In [None]:
image = session.file.get_stream("@NOTEBOOK_IMAGES_STG/Snowflake_DT_Challenges_Benefits.png" , decompress=False).read() 
# Display the image
st.image(image)

# :snowflake: Biggest Benefits of Snowflake Dynamic Tables :snowflake:
## :snowflake: :point_right: Focus on insights, not data wrangling.
## :racehorse: :fast_forward: Define insights, Snowflake Dynamic Tables handles the rest. :airplane:
## :bulb: :chart_with_upwards_trend: Drive investment decisions faster with automated data. :gem: :dart:




## Key Concepts About Data Pipeline Architecture Using Dynamic Tables
### 1. **Source data:** Data generated by real-world entities and collected in frontline systems. This data is then ingested into Snowflake via ETL processes.

### 2. **Raw data:** After ingestion, the data is stored in Snowflake tables, where it’s transformed into a form more suitable for analysis.

### 3. **Modeled data:** These transformations result in a set of models, which present familiar concepts to consumers for analysis.

Source: [Dynamic Table Query Performance](https://docs.snowflake.com/en/user-guide/dynamic-tables-performance-queries)

## Volume Weighted Average Price Using :snowflake: Snowflake :snowflake: Dynamic Table
#### Author: [Prasanna Rajagopal](https://www.linkedin.com/in/prasannarajagopal/)
Last Updated: April, 2025
- The **Volume Weighted Average Price (VWAP)** is a **technical indicator** that represents the average price of a security weighted by its trading volume over a specific period. 
- It shows the **average price** at which a stock has traded throughout the day, with **more weight given to prices with higher volume**.
- In this Notebook, we will use Snowflake's Dynamic Tables to calculate the 20-minute VWAP price. 
    - This will be our first Dynamic Table for VWAP.
    - We will call this intermediate VWAP.
- We will use the intermediate VWAP to calculate the new VWAP that takes into account all the VWAP data available utp to that point. 
    - We will call it the final VWAP.  

#### Section 1: Download and Load Source Data into Named Internal Stage in Snowflake.
#### Section 2: Load Trade Data Into Raw Table in Snowflake
#### Section 3: Creating Dynamic Tables
#### Section 4: Build the Volume Weighted Average Price Chart
#### Section 5: Managing Dynamic Tables
#### Section 6: Monitoring Dynamic Tables
#### Section 7: Cost Monitoring
#### Section 8: Dynamic Table Best Practices


In [None]:
image = session.file.get_stream("@NOTEBOOK_IMAGES_STG/VWAP_DT_Streamlit_Snowflake_Notebook.png" , decompress=False).read() 
# Display the image
st.image(image)

## Section 1: Download and Load Source Data into Named Internal Stage in Snowflake.
## VWAP Demo: Set-up Named Stage for Equity Trade Data.

In [None]:
CREATE STAGE DYNAMIC_TABLE_DEMO_STG 
	DIRECTORY = ( ENABLE = true ) 
	ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE' ) 
	COMMENT = 'Stage for Stock Prices JSON Files To Calculate the VWAP Using Dynamic Tables.';

## You have three (3) options for creating the data for this demo. 
### **Option #1:**
#### You can use your equity trade data for this demo. 
#### You will have to make appropriate changes to the code to ingest the trade data into Snowflake. 
### **Option #2:**
#### Use Snowflake Marketplace Data Providers
#### For example: [Xignite Equity & FX Price Data](https://app.snowflake.com/marketplace/listing/GZSNZ3Y8LK/xignite-equity-fx-price-data)
### **Option #3:**
#### The second option is to create an account with a data service such as [Polygon.io](https://polygon.io/)
#### You can create a Python client using the Polygon API and download the datafiles:
#### Here's the Python Code:

```python
from polygon import RESTClient
PolygonRESTclient = RESTClient(api_key="<Your API Key Here>")
trade_limit = 50000
aggs = PolygonRESTclient.get_aggs(<ticker_symbol>, 1, "minute", <from date>, <to date>, raw = 'true', limit = trade_limit)
aggs_json = json.loads(str(aggs.data, 'UTF-8'))
downloadLocation = f"<Your File Download Location>/<Your File Name>.json"
f = open(downloadLocation, 'w' )
f.write(str(aggs_json))
f.close()
```

## Load the files into the stage: DYNAMIC_TABLE_DEMO_STG  

## Sample 1-Minute Aggregated Equity Trade Data From [Polygon](Polygon.io) :chart_with_upwards_trend:
```json
{
    'ticker': 'MSFT', 
    'queryCount': 15267, 
    'resultsCount': 15267, 
    'adjusted': True, 
    'results': [
                    {
                     'v': 211, 
                     'vw': 247.5947, 
                     'o': 247.61, 
                     'c': 247.61, 
                     'h': 247.61, 
                     'l': 247.61, 
                     't': 1675242000000, 
                     'n': 7
                     }
                ]
}
```

In [None]:
LIST @DYNAMIC_TABLE_DEMO_STG;

## Section 2: Load Trade Data Into Raw Table in Snowflake
## Create a Table To Store the Raw Data from the Source Files in the Stage.

In [None]:
CREATE OR REPLACE TRANSIENT TABLE COMPANY_STOCK_TRADES_RAW_DT_DEMO_TBL 
(
    TICKER VARIANT,
    RESULTS VARIANT
);

## Copy the Source Data from the Staged JSON Files into the Raw Table

In [None]:
COPY INTO COMPANY_STOCK_TRADES_RAW_DT_DEMO_TBL
FROM @DEMODB.EQUITY_RESEARCH.DYNAMIC_TABLE_DEMO_STG
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
FILE_FORMAT = (type = 'JSON' STRIP_OUTER_ARRAY = TRUE)
FORCE = TRUE; 

## Test the Raw Data Table With a SELECT statement.

In [None]:
SELECT * FROM COMPANY_STOCK_TRADES_RAW_DT_DEMO_TBL LIMIT 1;

# Section 3: Creating Dynamic Tables
## Business Goal: Create Intermediate & Final Volume Weighted Average Price (VWAP)

## [Enable Change Tracking](https://docs.snowflake.com/en/user-guide/dynamic-tables-create#enable-change-tracking)
## To create a Dynamic Table on the raw data, you need CHANGE_TRACKING turned on for the raw data table. 
- When creating a dynamic table with **incremental refresh mode**, if change tracking is not already enabled on the tables that it queries, **Snowflake automatically attempts to enable change tracking on them.** 
- In order to support incremental refreshes, change tracking must be enabled with non-zero time travel retention on all underlying objects used by a dynamic table. 
- As underlying database objects change, so does the dynamic table. If you recreate an object, you must re-enable change tracking.

In [None]:
SHOW TABLES LIKE 'COMPANY_STOCK_TRADES_RAW_DT_DEMO_TBL';

In [None]:
ALTER TABLE COMPANY_STOCK_TRADES_RAW_DT_DEMO_TBL SET CHANGE_TRACKING = TRUE;

In [None]:
SHOW WAREHOUSES;

# Section 3.1: Dynamic Table To Store Parsed Raw Data

In [None]:
image = session.file.get_stream("@NOTEBOOK_IMAGES_STG/Snowflake_DT_Interface.png" , decompress=False).read() 
# Display the image
st.image(image)

## Dynamic Table Parameters
### Required Parameters
- Name for the Dynamic Table: ```<name>```
- ```TARGET_LAG = { num { seconds | minutes | hours | days } | DOWNSTREAM }```
- ```WAREHOUSE = <warehouse_name>```
- ```AS <query>```
### A Few Other Paramaters 
- ```REFRESH_MODE = { AUTO | FULL | INCREMENTAL }```
    - Default: AUTO
- ```INITIALIZE = { ON_CREATE | ON_SCHEDULE }``` 
    - Default: ON_CREATE
- ```CLUSTER BY ( expr [ , expr , ... ] )```

[More Parameters](https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table)

## Full vs Incremental Refresh 
## Full refresh mode performance
- A full refresh **executes the query** and **overwrites** the dynamic tables with the results. 
- The content of a dynamic table is the same regardless of whether full or incremental refresh mode is chosen.
- Full refresh is typically used for complex queries or workloads where incremental refresh is less efficient.
- To optimize full refresh performance, treat it like any other Snowflake query. 
- The cost includes both executing the query and inserting the results, not just the query execution.

[Full Refresh Mode](https://docs.snowflake.com/en/user-guide/dynamic-tables-performance-refresh-mode#full-refresh-mode-performance)

## Incremental refresh mode performance
- An **incremental refresh focuses on applying changes** since the last refresh, making it more efficient for large datasets with small updates. 
- The content of a dynamic table is the same regardless of the chosen refresh mode.
- Incremental refresh can be more **resource-efficient** because it skips reprocessing unchanged data.

[Incremental Refresh Mode](https://docs.snowflake.com/en/user-guide/dynamic-tables-performance-refresh-mode#incremental-refresh-mode-performance)

## Understanding incremental refresh performance
- In an incremental refresh, most of the effort usually goes into computing changes in the dynamic table.
- Computing changes depends on the query and can be quite complex. 
- A common misunderstanding is that an incremental refresh only scans changes in the source tables, not the source tables themselves.
- For example, imagine a query that does an **inner join between tables A and B**. 
    - If a row is inserted into table A, it must be joined with table B to compute changes in the query. 
    - This single row in A can join with many rows in B, which can mean there's a lot of work even if there are only a few changes in the sources.

[Incremental refresh performance](https://docs.snowflake.com/en/user-guide/dynamic-tables-performance-refresh-mode#understanding-incremental-refresh-performance)

## Supported queries in incremental refresh
```WITH```
- Common table expressions (CTE) that use incremental refresh supported features in the subquery.

Expressions in ```SELECT```
- Expressions including those using deterministic built-in functions and immutable user-defined functions.

```FROM```

- Source tables, views, Snowflake-managed Apache Iceberg™ tables, and other dynamic tables. 
- Subqueries outside of FROM clauses (for example, WHERE EXISTS) are not supported.

```OVER```

- All window functions.

```WHERE/HAVING/QUALIFY```

- Filters with the same expressions that are valid in SELECT.

```JOIN``` 
- Other expressions for joining tables
- Supported join types for incremental refresh include inner joins, outer-equi joins, cross joins, and lateral flatten (only the non-static FLATTEN table function). 
- You can specify any number of tables in the join, and updates to all tables in the join are reflected in the results of the query.
- Selecting the flatten SEQ column from a lateral flatten join is not supported for incremental refresh.

```UNION ALL```
- Supported with incremental refresh mode.

```GROUP BY```
- Supported with incremental refresh mode.

[Supported queries in incremental refresh](https://docs.snowflake.com/en/user-guide/dynamic-tables-refresh#supported-queries-in-incremental-refresh)

# Section 3.1: Dynamic Table 1
## Parse the Raw Data 
## Dynamic Table Name: PARSE_STOCK_TRADES_DT_DEMO_DT

In [None]:
CREATE OR REPLACE TRANSIENT DYNAMIC TABLE PARSE_STOCK_TRADES_DT_DEMO_DT
(
    TICKER_SYMBOL   VARCHAR,
    TRADE_TIME      TIMESTAMP_NTZ,
    TRADE_PRICE     NUMBER(20, 4),
    TRADE_VOLUME    NUMBER
)
TARGET_LAG = DOWNSTREAM
WAREHOUSE = VWAP_DT_WH
REFRESH_MODE = INCREMENTAL
AS
SELECT
    TICKER::VARCHAR                                 TICKER_SYMBOL,
    TO_TIMESTAMP_NTZ(TO_NUMBER(trades.VALUE:"t"),3) TRADE_TIME,
    TO_NUMBER(trades.VALUE:"c",14, 4)               TRADE_PRICE,
    TO_NUMBER(trades.VALUE:"v")                     TRADE_VOLUME
FROM 
    COMPANY_STOCK_TRADES_RAW_DT_DEMO_TBL CSTR,
    LATERAL FLATTEN (input => CSTR.RESULTS) TRADES
ORDER BY TICKER_SYMBOL, TRADE_TIME;

# Section 3.2: Dynamic Table 2
## Calculate the Volume Weighted Average Price for 20-Minute Intervals.

In [None]:
CREATE OR REPLACE TRANSIENT DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT
(
    TRADE_TIME_SLICE                TIMESTAMP_NTZ,
    TICKER_SYMBOL                   VARCHAR,
    TICKER_SYMBOL_TRADE_TIME_SLICE  VARCHAR, 
    SUM_PRICE                       NUMBER(20, 4),
    SUM_VOLUME                      NUMBER,
    INTERMEDIATE_SUM_PRICE_VOLUME   NUMBER(20, 4),
    INTERMEDIATE_VWAP               NUMBER(20, 4) 
)
TARGET_LAG = DOWNSTREAM
WAREHOUSE = VWAP_DT_WH
REFRESH_MODE = INCREMENTAL
AS
SELECT
    TIME_SLICE(TRADE_TIME, 20, 'MINUTE') TRADE_TIME_SLICE,
    TICKER_SYMBOL,
    TICKER_SYMBOL || TO_VARCHAR(TRADE_TIME_SLICE) TICKER_SYMBOL_TRADE_TIME_SLICE,
    SUM(TRADE_PRICE)    SUM_PRICE,
    SUM(TRADE_VOLUME)   SUM_VOLUME,
    SUM(TRADE_PRICE * TRADE_VOLUME) INTERMEDIATE_SUM_PRICE_VOLUME,
    SUM(TRADE_PRICE * TRADE_VOLUME)/SUM(TRADE_VOLUME)  INTERMEDIATE_VWAP
FROM 
    PARSE_STOCK_TRADES_DT_DEMO_DT
GROUP BY TICKER_SYMBOL, TRADE_TIME_SLICE, TICKER_SYMBOL_TRADE_TIME_SLICE
ORDER BY TICKER_SYMBOL, TRADE_TIME_SLICE ASC;

In [None]:
SELECT * FROM INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT LIMIT 200;

# Section 3.3: Dynamic Table 3
## Calculate the Cumulative Volume Weighted Average Price Based on Available Data in the Table.

In [None]:
CREATE OR REPLACE TRANSIENT DYNAMIC TABLE VWAP_STOCK_TRADES_DT_DEMO_DT
(
    TRADE_TIME_SLICE                    TIMESTAMP_NTZ,
    TICKER_SYMBOL                       VARCHAR,
    TICKER_SYMBOL_TRADE_TIME_SLICE      VARCHAR, 
    CUMULATIVE_PRICE                    NUMBER(20,4),
    CUMULATIVE_VOLUME                   NUMBER,
    FINAL_VWAP                          NUMBER(20,4)
)
TARGET_LAG = '30 minutes'
WAREHOUSE = VWAP_DT_WH
REFRESH_MODE = INCREMENTAL
AS
SELECT
    TRADE_TIME_SLICE,
    TICKER_SYMBOL, 
    TICKER_SYMBOL_TRADE_TIME_SLICE,
    (SUM(SUM_PRICE) OVER  (PARTITION BY TICKER_SYMBOL ORDER BY TRADE_TIME_SLICE ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) CUMULATIVE_PRICE,
    (SUM(SUM_VOLUME) OVER (PARTITION BY TICKER_SYMBOL ORDER BY TRADE_TIME_SLICE ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) CUMULATIVE_VOLUME,
    (SUM(INTERMEDIATE_SUM_PRICE_VOLUME) OVER (PARTITION BY TICKER_SYMBOL ORDER BY TRADE_TIME_SLICE ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW))/(SUM(SUM_VOLUME) OVER     (PARTITION BY TICKER_SYMBOL ORDER BY TRADE_TIME_SLICE ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) FINAL_VWAP
FROM
    INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT
ORDER BY TICKER_SYMBOL, TRADE_TIME_SLICE ASC;

## Limitations
- You **can’t truncate** data from a dynamic table.
- You **can’t create a temporary dynamic table**.

[More Limitations](https://docs.snowflake.com/en/user-guide/dynamic-tables-limitations#general-limitations).
## Supported Data Types
- Dynamic tables support all Snowflake SQL data types for both incremental and full refresh.

[Supported Data Types](https://docs.snowflake.com/en/user-guide/dynamic-tables-limitations#supported-data-types)

**Except:**
- **Structured data types.**
    - Array, Object, Map.  
- **Geospatial data types (full refresh only).**
    - Geography, Geometry.
## Limitation for Dynamic Table Query Constructs
- External functions.
- Sequences.
- Functions that rely on CURRENT_USER. Dynamic table refreshes act as their owner role with a special SYSTEM user.
- Sources that include directory tables, external tables, streams, and materialized views.
- Views on dynamic tables or other unsupported objects.
- User-defined table functions (UDTF) written in SQL.
- User-defined functions (UDF) written in SQL that contain a subquery (for example, a SELECT statement).
- Importing UDFs from an external stage.
- PIVOT and UNPIVOT constructs are not supported in incremental or full refresh.
- SAMPLE / TABLESAMPLE constructs are not supported in dynamic table incremental or full refresh.

[Limitations on Query Constructs](https://docs.snowflake.com/en/user-guide/dynamic-tables-limitations#limitations-on-query-constructs)

# Section 4: Build the Volume Weighted Average Price Chart

In [None]:
# Name of the sample database and the schema to be used
SOURCE_DATA_PATH = "DEMODB.EQUITY_RESEARCH"
SAVE_DATA_PATH = "DEMODB.EQUITY_RESEARCH"
# Make sure we use the created database and schema for temp tables etc
session.use_schema(SAVE_DATA_PATH)

In [None]:
# --- Section 1: Set the Ticker for VWAP Retrieval From Dynamic Tables
compPickedForVWAP = 'MSFT'

# # Display the user's selected company ticker symbol in the Streamlit application.

st.write(compPickedForVWAP)

# --- Section 2: Reading Intermediate VWAP Data from Snowflake ---
# Define a variable (commented out) that seems to represent a list of columns
# intended to be kept from an intermediate VWAP calculation. This variable is not used
# in the subsequent code but might have been part of a previous or planned implementation.
# intermediate_VWAP_keep_cols = ['TRADE_TIME_SLICE', 'L_LINENUMBER', 'L_PARTKEY', 'L_RETURNFLAG', 'L_QUANTITY', 'L_DISCOUNT', 'L_EXTENDEDPRICE']

# Read data from a Snowflake table named 'INTERMEDIATE_VWAP_STOCK_TRADES_DT'.
# The table name is constructed using the 'SOURCE_DATA_PATH' variable (defined elsewhere).
# The data is read using a function 'spd.read_snowflake()', which utilizes the Snowflake connector.
# The resulting DataFrame is then sorted by 'TICKER_SYMBOL' and 'TRADE_TIME_SLICE' in ascending order.


intermediate_Ticker_VWAP_df = spd.read_snowflake(f"{SOURCE_DATA_PATH}.INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT").sort_values(["TICKER_SYMBOL","TRADE_TIME_SLICE"], ascending = True)

# Filter the 'intermediate_Ticker_VWAP_df' to include only rows where the 'TICKER_SYMBOL'
# matches the company selected by the user ('compPickedForVWAP').
# The '.where()' method in pandas returns a DataFrame with NaN values where the condition is False.

filtered_intermediate_Ticker_VWAP_df = intermediate_Ticker_VWAP_df.where(intermediate_Ticker_VWAP_df['TICKER_SYMBOL'] == compPickedForVWAP)

# Remove rows containing NaN values from the filtered DataFrame. This effectively keeps only
# the data corresponding to the selected company.

filtered_intermediate_Ticker_VWAP_df = filtered_intermediate_Ticker_VWAP_df.dropna()

# --- Section 3: Reading Final VWAP Data from Snowflake ---
# Similar to Section 5, read data from another Snowflake table named 'VWAP_STOCK_TRADES_DT'.
# This table likely contains the final calculated VWAP values.

final_Ticker_VWAP_df = spd.read_snowflake(f"{SOURCE_DATA_PATH}.VWAP_STOCK_TRADES_DT_DEMO_DT")

# Filter the 'final_Ticker_VWAP_df' to include only rows where the 'TICKER_SYMBOL'
# matches the company selected by the user.

filtered_final_Ticker_VWAP_df = final_Ticker_VWAP_df.where(final_Ticker_VWAP_df['TICKER_SYMBOL'] == compPickedForVWAP)

# Remove rows containing NaN values from the filtered final VWAP DataFrame.
filtered_final_Ticker_VWAP_df = filtered_final_Ticker_VWAP_df.dropna()


# --- Section 4: Merging Intermediate and Final VWAP Data ---
# Merge the filtered intermediate and final VWAP DataFrames based on a common column
# 'TICKER_SYMBOL_TRADE_TIME_SLICE'.
# - 'left_on' and 'right_on' specify the columns to use for the merge from the left and right DataFrames, respectively.
# - 'how='left'' indicates a left merge, meaning all rows from the 'filtered_intermediate_Ticker_VWAP_df'
#   will be included, and matching rows from 'filtered_final_Ticker_VWAP_df' will be joined. If there's no match
#   in the right DataFrame, the columns from the right DataFrame will have NaN values.

spd_intermediate_and_final_Ticker_vwap_df = filtered_intermediate_Ticker_VWAP_df.merge(filtered_final_Ticker_VWAP_df,
                                            left_on='TICKER_SYMBOL_TRADE_TIME_SLICE', 
                                            right_on='TICKER_SYMBOL_TRADE_TIME_SLICE', 
                                            how='left')

# --- Section 5: Preparing Data for Plotting ---
# Create a dictionary containing the data to be used for plotting.
# It extracts the 'TRADE_TIME_SLICE' from the intermediate VWAP DataFrame
# (assuming the trade time slices are consistent across both DataFrames and are represented by '_x' suffix after the merge),
# the 'INTERMEDIATE_VWAP' from the intermediate VWAP DataFrame,
# and the 'FINAL_VWAP' from the merged DataFrame (which originated from the final VWAP data).

data = {
    'TRADE_TIME_SLICE_x': spd_intermediate_and_final_Ticker_vwap_df['TRADE_TIME_SLICE_x'],
    'INTERMEDIATE_VWAP': spd_intermediate_and_final_Ticker_vwap_df['INTERMEDIATE_VWAP'],
    'FINALVWAP': spd_intermediate_and_final_Ticker_vwap_df['FINAL_VWAP']
}

# Create a pandas DataFrame from the prepared data dictionary.

df = spd.DataFrame(data)

# --- Section 6: Creating and Displaying the Plot ---
# Create a new figure and a set of subplots using matplotlib.
# 'figsize' sets the size of the plot.

plt.figure(figsize=(15, 6))

# Plot the 'INTERMEDIATE_VWAP' against 'TRADE_TIME_SLICE_x'.
# 'label' is used for the legend.

plt.plot(df['TRADE_TIME_SLICE_x'], df['INTERMEDIATE_VWAP'], label='INTERMEDIATE_VWAP')

# Plot the 'FINAL_VWAP' against 'TRADE_TIME_SLICE_x'.

plt.plot(df['TRADE_TIME_SLICE_x'], df['FINAL_VWAP'], label='FINAL_VWAP')

# Add a title to the plot, including the name of the selected company.

plt.title(f"{compPickedForVWAP} - Volume Weighted Average Price (VWAP)")

# Add a label to the x-axis.

plt.xlabel('TRADE')

# Add a label to the y-axis.

plt.ylabel('VWAP')

# Display the legend, which identifies the plotted lines.

plt.legend()

# Show the created plot. This will display the visualization in the Streamlit application.

plt.show()

# Section 5: Managing Dynamic Tables
```SQL 
SHOW DYNAMIC TABLES LIKE 'vwap%' IN SCHEMA DEMODB.EQUITY_RESEARCH;
```

```SQL
DESC DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT;
```


In [None]:
SHOW DYNAMIC TABLES LIKE '%vwap%'; -- IN SCHEMA mydb.myschema;

In [None]:
SHOW DYNAMIC TABLES LIKE '%vwap%' IN SCHEMA DEMODB.EQUITY_RESEARCH;

In [None]:
DESC DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT;

In [None]:
DESC DYNAMIC TABLE VWAP_STOCK_TRADES_DT_DEMO_DT;

## Change the Warehouse for a Dynamic Table

In [None]:
-- Current WH: VWAP_DT_WH
-- New WH: DEMO_XSMALL_WH
ALTER DYNAMIC TABLE VWAP_STOCK_TRADES_DT_DEMO_DT SET WAREHOUSE = DEMO_XSMALL_WH;

In [None]:
ALTER DYNAMIC TABLE VWAP_STOCK_TRADES_DT_DEMO_DT SET WAREHOUSE = VWAP_DT_WH;

## [Swapping Dynamic Tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-manage#swap-dynamic-tables)
- Allows for a seamless transition between datasets or table versions without disrupting workflows or modifying dependent scripts. 
- If you’re developing a new version of a table but want to keep the same name for ongoing processes, swapping lets you replace the old table with the new one. 
- This approach ensures continuity while enabling updates, testing, or upgrades with minimal downtime or disruption.

In [None]:
ALTER DYNAMIC TABLE my_dynamic_table SWAP WITH my_new_dynamic_table;

## [Cluster dynamic tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-manage#cluster-dynamic-tables)
- Clustering dynamic tables can enhance performance by improving query efficiency and refresh operations:
- **Query efficiency:** Clustering dynamic tables can help speed up queries, just like with regular tables, by clustering on common join keys or filter columns.
- **Refresh operations:** Clustering can also help make refreshes faster if the clustering keys align with frequent change patterns
- Clustering by user ID can be effective when you have updates where a handful of users change.

In [None]:
ALTER DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT CLUSTER BY (TRADE_TIME_SLICE);

In [None]:
ALTER DYNAMIC TABLE VWAP_STOCK_TRADES_DT_DEMO_DT CLUSTER BY (TRADE_TIME_SLICE);

## [Drop Dynamic Table](https://docs.snowflake.com/en/user-guide/dynamic-tables-manage#drop-existing-dynamic-tables)

In [None]:
DROP DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT;

## [Restore Dynamic Table](https://docs.snowflake.com/en/user-guide/dynamic-tables-manage#restore-dropped-dynamic-tables)
- Note that you can only undrop dynamic tables within the retention period (default is 24 hours).
- You have the OWNERSHIP privilege on that dynamic table.

In [None]:
UNDROP DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT;

## [Understanding the effects of changes to columns in base tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-manage#understanding-the-effects-of-changes-to-columns-in-base-tables)
- Change: **New column added or Existing unused column removed.**
    - Impact: None. If a new column is added to the base table or an unused column is deleted, no action occurs and refreshes continue as before.
- Change: **Underlying base table is recreated with identical column names and types**
- Change: **Underlying base table column is recreated with the same name and type.**
    - Impact: Full refresh/reinitialize: During the next refresh cycle, a full refresh is done to ensure that no incorrect or stale data is in the dynamic table.
- Change: **An underlying column or other element used by a dynamic table changes in name or in some other way.**
    - Impact: The state of the dynamic table changes to FAILED. The dynamic table must be recreated to respond to the change.


## [Suspend or resume dynamic tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-manage#suspend-or-resume-dynamic-tables)
- Dynamic tables are automatically suspended after five consecutive scheduled refresh errors. 
- A successful refresh, including a manual refresh, resets the error count to zero. 
- For example, if a table fails two consecutive scheduled refreshes, then succeeds on the next, the error count resets to zero.
- Errors from manually triggered refreshes don’t count toward this limit.
- Any dynamic tables dependent on a suspended table are also suspended.

In [None]:
ALTER DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT SUSPEND;

In [None]:
ALTER DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT RESUME;

## [Manually refresh a dynamic table](https://docs.snowflake.com/en/user-guide/dynamic-tables-manage#manually-refresh-a-dynamic-table)
- To get the latest data, manually refresh a dynamic table before its next scheduled refresh. 
- This is useful for large target lags or one-time freshness needs. 
- For example, if a dynamic table is configured with a large target lag and its next refresh is hours away, a manual refresh ensures up-to-date data.
- Manual refreshes are never skipped but they can cause scheduled refreshes to skip, especially if you perform frequent manual refreshes on a dynamic table.
- Doing so can prevent downstream dynamic tables from refreshing. 
- For this reason, Snowflake recommends that you avoid frequently performing manual refreshes on a dynamic table with downstream dynamic tables that are expected to refresh according to target lag.

In [None]:
ALTER DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT REFRESH

# Section 6: Monitoring Dynamic Tables

## Use Dynamic Table Graph History to Check Scheduling State.

In [None]:
--
-- Look for SCHEDULING_STATE to Find out the State of the Dynamic Table.  
-- INPUT column shows the upstream object dependency.  
-- 
SELECT *
  FROM TABLE (INFORMATION_SCHEMA.DYNAMIC_TABLE_GRAPH_HISTORY());

## A sample error stored in the scheduling state column
```json
{
  "reason_code": "SUSPENDED_DUE_TO_ERRORS",
  "reason_message": "The DT was suspended due to 5 consecutive refresh errors",
  "state": "SUSPENDED",
  "suspended_on": "2025-04-05 07:11:35.662 -0700"
}

## [Determine the optimal target lag for a dynamic table](https://docs.snowflake.com/en/user-guide/dynamic-tables-target-lag#determine-the-optimal-target-lag-for-a-dynamic-table)
### Checking Dynamic Table Refresh History
- Snowflake schedules refreshes slightly earlier to allow time for the refresh to complete. 
- For example, if you set the target lag to 5 minutes, it doesn’t mean the table will refresh exactly every 5 minutes. 
- Actual refresh intervals might be shorter than the specified lag. 
- If you want more consistent 5-minute refreshes, consider increasing the target lag slightly.
- You can use either the DYNAMIC_TABLE_REFRESH_HISTORY table function in INFORMATION_SCHEMA or Snowsight to determine the optimal target lag time per your requirements. 
- For example, analyzing refresh details, including duration and skipped refreshes, to make an informed decision.
- **This table function returns information about each refresh (completed and running) of dynamic tables.**

In [None]:
-- Function Called Without Database Name or Name Prefix.  
-- 
-- SELECT
--     *
-- FROM
--   TABLE (
--     INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY (
--     )
--   )
-- 
-- NAME_PREFIX => 'DEMODB.EQUITY_RESEARCH.'
--
-- SELECT
--     *
-- FROM
--   TABLE (
--     INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY (
--         NAME_PREFIX => 'DEMODB.EQUITY_RESEARCH.'
--     )
--   )
-- With NAME of the Dynamic Table
SELECT
    *
FROM
  TABLE (
    INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY (
        NAME => 'DEMODB.EQUITY_RESEARCH.INTERMEDIATE_VWAP_STOCK_TRADES_DT'
    )
  )

## Show Dynamic Tables with Refresh Errors

In [None]:
-- Show Dynamic Tables with Refresh Errors
SELECT
    *
FROM
  TABLE (
    INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY (
      NAME_PREFIX => 'DEMODB.EQUITY_RESEARCH.', ERROR_ONLY => TRUE
    )
  )
-- WHERE
--    NAME ILIKE '%VWAP%';

In [None]:
Select * FROM INTERMEDIATE_VWAP_STOCK_TRADES_DT_DEMO_DT IDT, VWAP_STOCK_TRADES_DT_DEMO_DT FDT WHERE IDT.TRADE_TIME_SLICE = FDT.TRADE_TIME_SLICE; 

## Introduction to [Event Table](https://docs.snowflake.com/en/developer-guide/logging-tracing/event-table-setting-up) in Snowflake :snowflake:
As your Snowflake objects—including procedures and UDFs—emit telemetry data, Snowflake collects the data in an event table whose data is available for queries. Snowflake includes an event table by default, but you can also create a new one.
To collect telemetry data, you must have an active event table and have set telemetry levels to allow data collection. If you **don’t already have an active event table**, Snowflake **makes the default event table the active event table.**
### [What is an Event Table?](https://docs.snowflake.com/en/developer-guide/logging-tracing/event-table-setting-up#what-is-an-event-table)
An event table is a special kind of database table with a predefined set of columns. The table’s structure supports the data model for OpenTelemetry, a framework for handling telemetry data. When an event table is active, Snowflake collects telemetry data in the table—including data that Snowflake itself generates and data that you emit by instrumenting your handler code using certain APIs. You can view the collected data by executing SQL queries.
### [Cost of Telemetry Data Collection](https://docs.snowflake.com/en/developer-guide/logging-tracing/logging-tracing-billing)
When you log messages from a function or procedure, Snowflake collects the messages in batches and ingests the batches into the event table.

To perform this work, Snowflake uses Snowflake-managed resources, also referred to as the serverless compute model. As is the case with other serverless features, Snowflake bills your account for the compute resource and cloud services usage needed to ingest the logged messages. These costs appear on your bill as separate line items.

To determine the credit usage for logging over time, use the ```EVENT_USAGE_HISTORY``` view.

### [Event Capture Levels](https://docs.snowflake.com/en/user-guide/dynamic-tables-monitor#set-the-severity-level-of-the-events-to-capture)

```ERROR```: Refresh failure events.

```WARN```: Failures to refresh upstream dynamic tables and refresh failure events.

```INFO```: Successful refresh events, failures to refresh upstream dynamic tables, and refresh failure events.



## Error-level Event Collection for All Objects in the Account

In [None]:
ALTER ACCOUNT SET LOG_LEVEL = WARN;

## Info-level Event Collection for All Objects in a Database

In [None]:
ALTER DATABASE DEMODB SET LOG_LEVEL = INFO;

## Warn-level Event Collection for a Dynamic Table

In [None]:
ALTER DYNAMIC TABLE INTERMEDIATE_VWAP_STOCK_TRADES_DT SET LOG_LEVEL = WARN;

## Disable the collection of logging and tracing events in the account

In [None]:
ALTER ACCOUNT SET EVENT_TABLE = NONE

In [None]:
SELECT * FROM SNOWFLAKE.TELEMETRY.EVENTS_VIEW
      WHERE resource_attributes:"snow.executable.type" = 'dynamic_table'
        -- AND resource_attributes:"snow.database.name" = 'DEMODB'
        -- AND record_attributes:"event.name" = 'refresh.status'
        -- AND record:"severity_text" = 'WARN'
        -- AND value:"state" = 'FAILED'

## Set-up E-mail Notification for Monitoring

In [None]:
CREATE OR REPLACE NOTIFICATION INTEGRATION MY_EMAIL_INTEGRATION
TYPE = EMAIL
ENABLED = TRUE
ALLOWED_RECIPIENTS = ('prasanna.rajagopal@snowflake.com');

## Test the E-mail Notification.

In [None]:
CALL SYSTEM$SEND_EMAIL(
    'MY_EMAIL_INTEGRATION',  -- Notification integration name
    'prasanna.rajagopal@snowflake.com',      -- Recipient email address
    'Alert: Testing Dynamic Table', -- Email subject
    'Testing DT' -- Email body
);

In [None]:
CREATE ALERT my_alert_on_dt_refreshes
  IF( EXISTS(
    SELECT * FROM SNOWFLAKE.TELEMETRY.EVENTS_VIEW
      WHERE resource_attributes:"snow.executable.type" = 'dynamic_table'
        AND resource_attributes:"snow.database.name" = 'DEMODB'
        AND record_attributes:"event.name" = 'refresh.status'
        AND record:"severity_text" = 'ERROR'
        AND value:"state" = 'FAILED'))
  THEN
    BEGIN
      LET result_str VARCHAR;
      (SELECT ARRAY_TO_STRING(ARRAY_ARG(name)::ARRAY, ',') INTO :result_str
         FROM (
           SELECT resource_attributes:"snow.executable.name"::VARCHAR name
             FROM TABLE(RESULT_SCAN(SNOWFLAKE.ALERT.GET_CONDITION_QUERY_UUID()))
             LIMIT 10
         )
      );
      CALL SYSTEM$SEND_EMAIL(
        'MY_EMAIL_INTEGRATION',  -- Notification integration name
        'prasanna.rajagopal@snowflake.com',      -- Recipient email address
        name, -- Email subject
        :result_str -- Email body
        );
    END;

## [Query the event table to monitor refreshes](https://docs.snowflake.com/en/user-guide/dynamic-tables-monitor#query-the-event-table-to-monitor-refreshes)
- Change the database name in the where clause below to match your database name.

```resource_attributes:"snow.database.name" = 'DEMODB'```

In [None]:
SELECT
    timestamp,
    resource_attributes:"snow.executable.name"::VARCHAR AS dt_name,
    resource_attributes:"snow.query.id"::VARCHAR AS query_id,
    value:message::VARCHAR AS error
  FROM SNOWFLAKE.TELEMETRY.EVENTS_VIEW
  WHERE
    resource_attributes:"snow.executable.type" = 'DYNAMIC_TABLE' AND
    resource_attributes:"snow.database.name" = 'DEMODB' AND
    value:state = 'FAILED'
  ORDER BY timestamp DESC;

# Section 7: Cost Monitoring

## [Understanding cost for dynamic tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-cost)
### Dynamic Tables incur compute and storage costs. 
### [Compute Costs](https://docs.snowflake.com/en/user-guide/dynamic-tables-cost#compute-cost)
There are two compute costs associated with dynamic tables: 
- **Virtual warehouses**
- **Cloud Services compute**

Dynamic tables require virtual warehouses to refresh - that is, run queries against base objects when they are initialized and refreshed, including both scheduled and manual refreshes. These operations use compute resources, which consume credits.

Dynamic tables also require Cloud Services compute to identify changes in underlying base objects and whether the virtual warehouse needs to be invoked. If no changes are identified, virtual warehouse credits aren’t consumed since there’s no new data to refresh. Note that there may be instances where changes in base objects are filtered out in the dynamic table query. In such scenarios, virtual warehouse credits are consumed because the dynamic table undergoes a refresh to determine whether the changes are applicable.

**If the associated virtual warehouse is suspended and no changes in base objects are identified, the suspended virtual warehouse doesn’t get invoked and no credits are consumed.** 

Conversely, if changes are identified, the virtual warehouse is automatically resumed to process the updates.

Dynamic table refreshes are driven by the configured target lag. Dynamic table pipelines with lower target lag refresh more often and therefore incur higher compute costs.

### [Storage Costs](https://docs.snowflake.com/en/user-guide/dynamic-tables-cost#storage-cost)
Dynamic tables require storage to store the materialized results. Similar to regular tables, you may incur additional storage cost for Time Travel, fail-safe storage, and cloning feature.
### Potential for storage cost savings
- [Transient Dynamic Tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-cost#transient-dynamic-tables)
- [Dynamic Iceberg Table](https://docs.snowflake.com/en/user-guide/dynamic-tables-create-iceberg)



## WAREHOUSE_METERING_HISTORY
This table function can be used in queries to return the **hourly credit usage** for a single warehouse (or all the warehouses in your account) within a specified date range.

In [None]:
SELECT
    warehouse_name,
    CREDITS_USED,
    CREDITS_USED_COMPUTE,
    CREDITS_USED_CLOUD_SERVICES,
    (credits_used_compute -
    credits_attributed_compute_queries) AS idle_compute_cost,
    DIV0(idle_compute_cost, CREDITS_USED_COMPUTE) idle_as_percent_of_compute_credits,
FROM 
    SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE start_time >= DATEADD('days', -20, CURRENT_DATE())
  AND end_time < CURRENT_DATE()
ORDER BY WAREHOUSE_NAME;

## [QUERY_ATTRIBUTION_HISTORY](https://docs.snowflake.com/en/sql-reference/account-usage/query_attribution_history) view
The value in the ```credits_attributed_compute``` column contains the warehouse credit usage for executing the query, inclusive of any resizing and/or autoscaling of multi-cluster warehouse(s). This cost is attributed based on the weighted average of the resource consumption.

The value doesn’t include any credit usage for warehouse idle time. Idle time is a period of time in which no queries are running in the warehouse and can be measured at the warehouse level.

The value doesn’t include any other credit usage that is incurred as a result of query execution. For example, the following are not included in the query cost:

- Data transfer costs

- Storage costs

- Cloud services costs

- Costs for serverless features

- Costs for tokens processed by AI services

In [None]:
SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ATTRIBUTION_HISTORY WHERE WAREHOUSE_NAME = 'VWAP_DT_WH' LIMIT 10;

In [None]:
SELECT
    usage_date,
    credits_used_cloud_services,
    credits_adjustment_cloud_services,
    credits_used_cloud_services + credits_adjustment_cloud_services AS billed_cloud_services
FROM 
    snowflake.account_usage.metering_daily_history
WHERE
    SERVICE_TYPE = 'WAREHOUSE_METERING'
AND
    usage_date >= DATEADD(month,-1,CURRENT_TIMESTAMP())
AND 
    credits_used_cloud_services > 0
ORDER BY 4 DESC;

# Section 8: [Dynamic Table Best Practices](https://docs.snowflake.com/en/user-guide/dynamic-tables-best-practices#best-practices-for-creating-dynamic-tables)
### 1. [Chain together pipelines of dynamic tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-best-practices#chain-together-pipelines-of-dynamic-tables)
    - When defining a new dynamic table, rather than defining a large dynamic table with many nested statements, use small dynamic tables with pipelines instead.
### 2. Use a [“controller” dynamic table](https://docs.snowflake.com/en/user-guide/dynamic-tables-best-practices#use-a-controller-dynamic-table-for-complex-task-graphs) for complex task graphs
    - When you have a complex graph of dynamic tables with many roots and leaves and you want to perform operations (e.g. changing lag, manual refresh, suspension) on the full task graph with a single command, do the following:

    - Set the value for the TARGET_LAG of all of your dynamic tables to DOWNSTREAM.

    - Create a “controller” dynamic table that reads from all of the leaves in your task graph. To ensure this controller doesn’t consume resources, do the following:

```SQL
CREATE DYNAMIC TABLE controller
    TARGET_LAG = <target_lag>
    WAREHOUSE = <warehouse>
AS
    SELECT 1 A FROM <leaf1>, …, <leafN> LIMIT 0;
```
- Use the controller to control the whole graph. For example:

- Set a new target lag for the task graph.

```SQL 
    ALTER DYNAMIC TABLE controller SET TARGET_LAG = <new_target_lag>
```
- Manually refresh the task graph.
```SQL
ALTER DYNAMIC TABLE controller REFRESH
```
### 3. [Cloning pipelines of dynamic tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-best-practices#about-cloning-pipelines-of-dynamic-tables)
- Avoid reinitializations of your pipeline by cloning all elements of the dynamic table pipeline in the same clone command to avoid reinitializations of your pipeline. 
- You can do this by **consolidating all elements of the pipeline (e.g. base tables, view, and dynamic tables) in the same schema or database.**
### 4. [Use transient dynamic tables to reduce storage cost](https://docs.snowflake.com/en/user-guide/dynamic-tables-best-practices#use-transient-dynamic-tables-to-reduce-storage-cost)
- Transient dynamic tables maintain data reliably over time and support Time Travel within the data retention period, but don’t retain data beyond the fail-safe period. 
- By default, dynamic table data is retained for 7 days in fail-safe storage. For dynamic tables with high refresh throughput, this can significantly increase storage consumption. 
- Therefore, you should make a dynamic table transient only if its data doesn’t need the same level of data protection and recovery provided by permanent tables.
### 5. Use dedicated warehouses for refreshes
### 6. [Use downstream lag](https://docs.snowflake.com/en/user-guide/dynamic-tables-best-practices#use-downstream-lag)
- Downstream lag indicates that the dynamic table should refresh when other dependent dynamic tables require refreshing. 
- You should use downstream lag as a best practice because of its ease of use and cost effectiveness. 
- Without downstream lag, **managing a chain of complex dynamic tables would require individually assigning each table its own target lag** and managing the associated constraints, instead of only monitoring the data freshness of the final table. 
### 7. [Set the refresh mode for all production dynamic tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-best-practices#set-the-refresh-mode-for-all-production-dynamic-tables)
- ```AUTO```
    - The system attempts to apply incremental refresh by default. 
    - When incremental refresh isn’t supported or might not perform well, the dynamic table automatically selects full refresh instead.
- ```INCREMENTAL```
- ```FULL```