# Part 3. Enrich Models With Snowflake Data Marketplace and Use Native Snowpark Operations to Manipulate The Data

1. Get Data From the Data Marketplace
2. Create a session for Snowpark with Snowflake
3. Import the Data
4. Manipulate the Data
5. Update Your Model


# 3.0 Imports

TO DO: Just run the cell

In [1]:
import json
import pandas as pd
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType, DateType
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml.registry import registry
from snowflake.ml._internal.utils import identifier

# 3.1 Get the Data Share

TO DO: Go to the hyperlink below, log in to you Snowflake account and click on the "GET" button.  This enables you to retrieve data from the Data Share


https://app.snowflake.com/marketplace/listing/GZT1ZA3NLF/similarweb-ltd-global-stocks-25-000-tickers-digital-traffic-data-by-domain?search=Global%20Stocks

# 3.2 Reading Snowflake Connection Details, create a Session
TO DO: Just run the cell

In [2]:
snowflake_connection_cfg = json.loads(open("/Users/mitaylor/Documents/creds/creds_sf_azure.json").read()) # <--- Update here
session = Session.builder.configs(snowflake_connection_cfg).create()
session.sql("USE DATABASE HOL_DEMO").collect()
session.sql("CREATE OR REPLACE WAREHOUSE ASYNC_WH WITH WAREHOUSE_SIZE='MEDIUM' WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'").collect()

[Row(status='Warehouse ASYNC_WH successfully created.')]

# 3.3 Use SQL to import the data

TO DO: Just run the cells

In [None]:
sdf = session.sql("""
SELECT ts.date,
       ts.variable_name,
       ts.value
FROM FINANCIAL__ECONOMIC_ESSENTIALS.cybersyn.financial_fred_timeseries AS ts
JOIN FINANCIAL__ECONOMIC_ESSENTIALS.cybersyn.financial_fred_attributes AS att
    ON (att.variable = ts.variable)
WHERE variable_group IN ('Bank of Brazil Selic Interest Rate Target',
                         'Bank of Canada Overnight Lending Rate',
                         'Bank of England Official Bank Rate',
                         'Bank of Japan Policy-Rate Balance Rate',
                         'Bank of Mexico Official Overnight Target Rate',
                         'ECB Main Refinancing Operations Rate: Fixed Rate Tenders for Euro Area',
                         'Federal Funds Effective Rate')""")
sdf.limit(5).to_pandas()

In [4]:
sdf.write.save_as_table("STREAMLIT_TEST", mode="overwrite")

# 3.4 Use Native Snowpark to Manipulate the Data
## 3.4.1 Filtering

TODO:

1. Trim the dataframe columns to just the two we care about for now - "DATE" and "VALUE"

Hints:

To select a specific set of spark dataframe columns you would use the syntax spark_dataframe[["col_name1", "col_name2"]], Snowpark syntax is the same



In [None]:
sdf_trimmed = # do part 1 here
sdf_trimmed.show()

## 3.4.2 GroupBy

TODO:

1. Group by date to get the average overnight rates for each of the central banks 

Hints:

Spark groupBy grouped_df = df.groupBy("the column you want to group by").agg(mean("temperature")

Snowpark's "group by" is invoked via "group_by", but otherwise the same
Snowpark's "mean" function is imported in the cell below 


In [5]:
from snowflake.snowpark.functions import mean as mean_

In [None]:
sdf_grouped = # do part 1 here
sdf_grouped.limit(5).to_pandas()

In [7]:
sdf_grouped = sdf_grouped.filter((F.col("DATE") >= '2022-01-01'))

## 3.4.3 Join it with our Prior Data

TO DOs:

1. read in your saved table from lab 1
2. Filter on the symbol you trained on
3. Update the join key for sdf_ml and sdf_grouped

In [8]:
sdf_ml = session.table("") # 1. Update here
sdf_ml = sdf_ml.filter((F.col("SYMBOL") == "")) # 2. Update here
sdf_joined = sdf_ml.join(sdf_grouped, sdf_ml[""] == sdf_grouped[""], rsuffix="_right", how="left") # 3. update here
sdf_joined = sdf_joined.rename(F.col("AVG(VALUE)"), "NEW_FEATURE")
sdf_joined = sdf_joined.drop("DATE_RIGHT")

In [None]:
sdf_joined.show()

# 3.5 Train and Test the Model

TO DO:

1. Update your fillna statement (this fills the entire dateframe, but the "AVG(VALUE)" is the only bit that's blank so don't worry about errors

In [10]:
sdf_joined = sdf_joined.na.fill() # 1. Update here
sdf_joined_train, sdf_joined_test = sdf_joined.filter((F.col("DATE") <= '2023-01-01')), sdf_joined.filter((F.col("DATE") > '2023-01-01')) 

Input value type doesn't match the target column data type, this replacement was skipped. Column Name: "DATE", Type: DateType(), Input Value: 0.05, Type: <class 'float'>
Input value type doesn't match the target column data type, this replacement was skipped. Column Name: "SYMBOL", Type: StringType(16777216), Input Value: 0.05, Type: <class 'float'>


In [11]:
REGISTRY_DATABASE_NAME = "MODEL_REGISTRY"
REGISTRY_SCHEMA_NAME = "PUBLIC"
native_registry = registry.Registry(session=session, database_name=REGISTRY_DATABASE_NAME, schema_name=REGISTRY_SCHEMA_NAME)

#train
regressor = LinearRegression
regressor = regressor(input_cols=["CLOSE_M1", "CLOSE_M2", "CLOSE_M3", "CLOSE_M4", "CLOSE_M5"],
                         label_cols=["CLOSE"],
                         output_cols=["CLOSE_PREDICT"])
regressor.fit(sdf_joined_train)

MODEL_NAME = "REGRESSION_IBM"
MODEL_VERSION = "v14"
model = native_registry.log_model(
    model_name=MODEL_NAME,
    version_name=MODEL_VERSION,
    model=regressor,
)

In [None]:
model_ = native_registry.get_model(MODEL_NAME).version(MODEL_VERSION)
model_.run(sdf_joined_test, function_name="predict").limit(10).to_pandas()

In [13]:
model_.run(sdf_joined_test, function_name="predict").write.save_as_table("ML_PREDICT", mode="overwrite")