In [None]:
# Connect to your data instantly with Snowpark
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import udf
import snowflake.snowpark.functions as F

from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import Registry
from snowflake.ml._internal.utils import identifier

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error

import json
import joblib
import cachetools
import warnings; warnings.simplefilter('ignore')

In [None]:
diamonds_df = session.read.table('SAVANNAH_TEST.SAVANNAH_TEST.DIAMONDS')

diamonds_df

In [None]:
SELECT
  cut,
  AVG(price) as avg_price
FROM
  SAVANNAH_TEST.SAVANNAH_TEST.DIAMONDS
GROUP BY
  cut;

In [None]:
cut_price_df = avg_price_per_cut.to_pandas()
max_price = cut_price_df['AVG_PRICE'].max()
print(f"Maximum average price: ${max_price:,.2f}")


In [None]:
price_threshold = 2000

In [None]:
SELECT
  cut,
  COUNT(*) as diamond_count
FROM
  SAVANNAH_TEST.SAVANNAH_TEST.DIAMONDS
WHERE
  price > {{price_threshold}}
GROUP BY
  cut;

In [None]:
import streamlit as st
import altair as alt

df = diamonds_df.to_pandas()

st.title("Diamond Carat vs Price Analysis")

chart = alt.Chart(df).mark_circle().encode(
    x='CARAT',
    y='PRICE',
    color='CUT',
    tooltip=['CARAT', 'PRICE', 'CUT', 'COLOR']
).properties(
    width=600,
    height=400
)

st.altair_chart(chart, use_container_width=True)

In [None]:
import streamlit as st
import altair as alt

# Cache the data loading
@st.cache_data
def load_data():
    return diamonds_df.to_pandas()

# Load the data using the cached function
df = load_data()

st.title("Diamond Analysis Dashboard")

# Create metrics for quick statistics
col1, col2, col3 = st.columns(3)
with col1:
    st.metric("Average Price", f"${df['PRICE'].mean():,.2f}")
with col2:
    st.metric("Average Carat", f"{df['CARAT'].mean():.2f}")
with col3:
    st.metric("Price Range", f"${df['PRICE'].min():,} - ${df['PRICE'].max():,}")

# Create filters
col1, col2 = st.columns(2)
with col1:
    cut_selection = st.multiselect("Select Cut", 
                                 options=sorted(df['CUT'].unique()), 
                                 default=sorted(df['CUT'].unique()))
with col2:
    color_selection = st.multiselect("Select Color", 
                                   options=sorted(df['COLOR'].unique()), 
                                   default=sorted(df['COLOR'].unique()))

price_range = st.slider("Price Range", 
                       min_value=int(df['PRICE'].min()), 
                       max_value=int(df['PRICE'].max()),
                       value=(int(df['PRICE'].min()), int(df['PRICE'].max())))

# Cache the filtering operation
@st.cache_data
def filter_data(df, cuts, colors, price_min, price_max):
    return df[
        (df['CUT'].isin(cuts)) &
        (df['COLOR'].isin(colors)) &
        (df['PRICE'].between(price_min, price_max))
    ]

# Apply filters using cached function
filtered_df = filter_data(df, cut_selection, color_selection, price_range[0], price_range[1])

# Create visualizations
col1, col2 = st.columns(2)

with col1:
    st.subheader("Price Distribution by Cut")
    chart1 = alt.Chart(filtered_df).mark_boxplot().encode(
        x='CUT:N',
        y='PRICE:Q',
        color='CUT:N'
    ).properties(height=300)
    st.altair_chart(chart1, use_container_width=True)

with col2:
    st.subheader("Carat vs Price")
    chart2 = alt.Chart(filtered_df).mark_circle().encode(
        x='CARAT:Q',
        y='PRICE:Q',
        color='CUT:N',
        tooltip=['CUT', 'COLOR', 'PRICE', 'CARAT']
    ).properties(height=300)
    st.altair_chart(chart2, use_container_width=True)

# Show average price by cut and color
st.subheader("Average Price by Cut and Color")
avg_price_chart = alt.Chart(filtered_df).mark_rect().encode(
    x='CUT:N',
    y='COLOR:N',
    color=alt.Color('mean(PRICE):Q', scale=alt.Scale(scheme='viridis')),
    tooltip=['CUT', 'COLOR', alt.Tooltip('mean(PRICE):Q', format='$,.2f')]
).properties(height=200)
st.altair_chart(avg_price_chart, use_container_width=True)


In [None]:
session.query_tag = {"origin":"sf_sit-is", 
                     "name":"e2e_ml_snowparkpython", 
                     "version":{"major":1, "minor":0,},
                     "attributes":{"is_quickstart":1}}
session

# Feature Transformations
We will illustrate a few of the transformation functions here, but the rest can be found in the documentation.

Let's use the MinMaxScaler to normalize the CARAT column.

In [None]:
# Normalize the CARAT column
snowml_mms = snowml.MinMaxScaler(input_cols=["CARAT"], output_cols=["CARAT_NORM"])
normalized_diamonds_df = snowml_mms.fit(diamonds_df).transform(diamonds_df)

# Reduce the number of decimals
new_col = normalized_diamonds_df.col("CARAT_NORM").cast(DecimalType(7, 6))
normalized_diamonds_df = normalized_diamonds_df.with_column("CARAT_NORM", new_col)

normalized_diamonds_df

Let's use the OrdinalEncoder to transform COLOR and CLARITY from categorical to numerical values so they are more meaningful.

In [None]:
# Encode CUT and CLARITY preserve ordinal importance
categories = {
    "CUT": np.array(["Fair", "Good", "Very Good", "Premium", "Ideal"]),
    "CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
}

snowml_oe = snowml.OrdinalEncoder(
    input_cols=["CUT", "CLARITY"], 
    output_cols=["CUT_OE", "CLARITY_OE"], 
    categories=categories
)

ord_encoded_diamonds_df = snowml_oe.fit(normalized_diamonds_df).transform(normalized_diamonds_df)

# Show the encoding
print(snowml_oe._state_pandas)

ord_encoded_diamonds_df


Let's use the OneHotEncoder to transform the categorical columns to numerical columns.
This is more for illustration purposes. Using the OrdinalEncoder makes more sense for the diamonds dataset since CARAT, COLOR, and CLARITY all follow a natural ranking order.

In [None]:
# Encode categoricals to numeric columns
snowml_ohe = snowml.OneHotEncoder(input_cols=["CUT", "COLOR", "CLARITY"], output_cols=["CUT_OHE", "COLOR_OHE", "CLARITY_OHE"])
transformed_diamonds_df = snowml_ohe.fit(ord_encoded_diamonds_df).transform(ord_encoded_diamonds_df)

np.array(transformed_diamonds_df.columns)

Finally, we can also build out a full preprocessing Pipeline.
This will be useful for both the ML training & inference steps to have standarized feature transformations.


In [None]:
# Categorize all the features for processing
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TAB_PCT", "X", "Y", "Z"]

categories = {
    "CUT": np.array(["IDEAL", "PREMIUM", "VERY_GOOD", "GOOD", "FAIR"]),
    "CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
    "COLOR": np.array(['D', 'E', 'F', 'G', 'H', 'I', 'J']),
}

In [None]:
# Categorize all the features for processing
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"]
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TAB", "X", "Y", "Z"]

# Update categories to match actual values in the data
categories = {
    "CUT": np.array(["Fair", "Good", "Very Good", "Premium", "Ideal"]),
    "CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
    "COLOR": np.array(['D', 'E', 'F', 'G', 'H', 'I', 'J'])
}

# Build the pipeline
preprocessing_pipeline = Pipeline(
    steps=[
            (
                "OE",
                snowml.OrdinalEncoder(
                    input_cols=CATEGORICAL_COLUMNS,
                    output_cols=CATEGORICAL_COLUMNS_OE,
                    categories=categories,
                )
            ),
            (
                "MMS",
                snowml.MinMaxScaler(
                    clip=True,
                    input_cols=NUMERICAL_COLUMNS,
                    output_cols=NUMERICAL_COLUMNS,
                )
            )
    ]
)

PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib'
joblib.dump(preprocessing_pipeline, PIPELINE_FILE)

transformed_diamonds_df = preprocessing_pipeline.fit(diamonds_df).transform(diamonds_df)
transformed_diamonds_df


In [None]:
session.sql("CREATE STAGE IF NOT EXISTS DIAMONDS").collect()

session.file.put(PIPELINE_FILE, "@DIAMONDS", overwrite=True)


In [None]:
# Categorize all the features for modeling
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TAB", "X", "Y", "Z"]

LABEL_COLUMNS = ['PRICE']
OUTPUT_COLUMNS = ['PREDICTED_PRICE']

In [None]:
session.file.get('@DIAMONDS/preprocessing_pipeline.joblib.gz', '/tmp')
PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib.gz'
preprocessing_pipeline = joblib.load(PIPELINE_FILE)

In [None]:
# Split the data into train and test sets
diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0)

# Run the train and test sets through the Pipeline object we defined earlier
train_df = preprocessing_pipeline.fit(diamonds_train_df).transform(diamonds_train_df)
test_df = preprocessing_pipeline.transform(diamonds_test_df)

In [None]:
# Define the XGBRegressor
regressor = XGBRegressor(
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
regressor.fit(train_df)

# Predict
result = regressor.predict(test_df)

In [None]:
regressor.predict(test_df[CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS].to_pandas())


In [None]:
mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="PRICE", 
                                        y_pred_col_names="PREDICTED_PRICE")

result.select("PRICE", "PREDICTED_PRICE")

In [None]:
print(f"Mean absolute percentage error: {mape}")


In [None]:
g = sns.relplot(data=result["PRICE", "PREDICTED_PRICE"].to_pandas().astype("float64"), x="PRICE", y="PREDICTED_PRICE", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

In [None]:
grid_search = GridSearchCV(
    estimator=XGBRegressor(),
    param_grid={
        "n_estimators":[100, 200, 300, 400, 500],
        "learning_rate":[0.1, 0.2, 0.3, 0.4, 0.5],
    },
    n_jobs = -1,
    scoring="neg_mean_absolute_percentage_error",
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
grid_search.fit(train_df)

In [None]:
# Analyze grid search results
gs_results = grid_search.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
    n_estimators_val.append(param_dict["n_estimators"])
    learning_rate_val.append(param_dict["learning_rate"])
mape_val = gs_results["mean_test_score"]*-1

gs_results_df = pd.DataFrame(data={
    "n_estimators":n_estimators_val,
    "learning_rate":learning_rate_val,
    "mape":mape_val})

sns.relplot(data=gs_results_df, x="learning_rate", y="mape", hue="n_estimators", kind="line")

plt.show()

In [None]:
# Predict
result = grid_search.predict(test_df)

# Analyze results
mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="PRICE", 
                                        y_pred_col_names="PREDICTED_PRICE")

result.select("PRICE", "PREDICTED_PRICE").show()
print(f"Mean absolute percentage error: {mape}")

In [None]:
# Plot actual vs predicted 
g = sns.relplot(data=result["PRICE", "PREDICTED_PRICE"].to_pandas().astype("float64"), x="PRICE", y="PREDICTED_PRICE", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

In [None]:
optimal_model = grid_search.to_sklearn().best_estimator_
optimal_n_estimators = grid_search.to_sklearn().best_estimator_.n_estimators
optimal_learning_rate = grid_search.to_sklearn().best_estimator_.learning_rate

optimal_mape = gs_results_df.loc[(gs_results_df['n_estimators']==optimal_n_estimators) &
                                 (gs_results_df['learning_rate']==optimal_learning_rate), 'mape'].values[0]

In [None]:
# Get sample input data to pass into the registry logging function
X = train_df.select(CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS).limit(100)

db = identifier._get_unescaped_name(session.get_current_database())
schema = identifier._get_unescaped_name(session.get_current_schema())

# Define model name
model_name = "DIAMONDS_PRICE_PREDICTION"

# Create a registry and log the model
native_registry = Registry(session=session, database_name=db, schema_name=schema)

# Let's first log the very first model we trained
model_ver = native_registry.log_model(
    model_name=model_name,
    version_name='V0',
    model=regressor,
    sample_input_data=X, # to provide the feature schema
    options={"enable_explainability": True}
)

# Add evaluation metric
model_ver.set_metric(metric_name="mean_abs_pct_err", value=mape)

# Add a description
model_ver.comment = "This is the first iteration of our Diamonds Price Prediction model. It is used for demo purposes."

# Now, let's log the optimal model from GridSearchCV
model_ver2 = native_registry.log_model(
    model_name=model_name,
    version_name='V1',
    model=optimal_model,
    sample_input_data=X, # to provide the feature schema
    options={"enable_explainability": True}
)

# Add evaluation metric
model_ver2.set_metric(metric_name="mean_abs_pct_err", value=optimal_mape)

# Add a description
model_ver2.comment = "This is the second iteration of our Diamonds Price Prediction model \
                        where we performed hyperparameter optimization."

In [None]:
print('test')