### Data Engineering Pipelines with Snowflake Notebooks

What You'll Learn
How to ingest custom file formats (like Excel) with Snowpark from an external stage (such as an S3 bucket) into a Snowflake table
How to access data from Snowflake Marketplace and use it for your analysis
How to use Snowflake Notebooks and the Snowpark DataFrame API to build data engineering pipelines
How to add logging to your Python data engineering code and monitor from within Snowsight
How to execute SQL scripts from your Git repository directly in Snowflake
How to use open-source Python libraries from curated Snowflake Anaconda channel
How to use the Snowflake Python Management API to programmatically work with Snowflake objects
How to use the Python Task DAG API to programatically manage Snowflake Tasks
How to build CI/CD pipelines using Snowflake's Git Integration, the Snowflake CLI, and GitHub Actions
How to deploy Snowflake Notebooks from dev to production

Source Data:

Tutorial: https://quickstarts.snowflake.com/guide/data_engineering_with_notebooks/index.html?index=..%2F..index#0

In [None]:
# Imports
from datetime import timedelta

from snowflake.snowpark.context import get_active_session
from snowflake.core import Root
# import snowflake.snowpark.functions as F
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask, DAGRun

In [None]:
-- Set DDL

-- Warehouses
CREATE OR REPLACE WAREHOUSE TEST_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE;
USE WAREHOUSE TEST_WH;

-- Databases
CREATE OR ALTER DATABASE data_engineering_with_snowflake_notebooks;
ALTER DATABASE data_engineering_with_snowflake_notebooks SET LOG_LEVEL = INFO;
USE DATABASE data_engineering_with_snowflake_notebooks;

-- Schemas
CREATE OR REPLACE SCHEMA INTEGRATIONS;
CREATE OR REPLACE SCHEMA DEV;

-- Stages
USE SCHEMA INTEGRATIONS;
CREATE OR REPLACE STAGE FROSTBYTE_RAW_STAGE
    URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/';


-- Events
CREATE EVENT TABLE data_engineering_with_snowflake_notebooks.INTEGRATIONS.EVENTS;
ALTER ACCOUNT SET EVENT_TABLE = data_engineering_with_snowflake_notebooks.INTEGRATIONS.EVENTS;

In [None]:
-- Set GitHub

SET GITHUB_USERNAME = ; -- TODO!!!
SET GITHUB_URL = 'https://github.com';
SET GITHUB_REPO = 'util_lib';  -- repo holding the helper scripts
SET GITHUB_URL_PREFIX = $GITHUB_URL || '/' || $GITHUB_USERNAME;
SET GITHUB_REPO_ORIGIN =  $GITHUB_URL || '/' || $GITHUB_USERNAME || '/' || $GITHUB_REPO;

-- Secrets (schema level)
USE SCHEMA INTEGRATIONS;
CREATE OR REPLACE SECRET GITHUB_SECRET
  TYPE = password
  USERNAME = $GITHUB_USERNAME
  PASSWORD = ; -- TODO!!!

-- API Integration (account level)
CREATE OR REPLACE API INTEGRATION GITHUB_API_INTEGRATION
  API_PROVIDER = GIT_HTTPS_API
  API_ALLOWED_PREFIXES = ($GITHUB_URL_PREFIX)  --parenthesis required
  ALLOWED_AUTHENTICATION_SECRETS = (GITHUB_SECRET)  --parenthesis required
  ENABLED = TRUE;

-- Git Repository
CREATE OR REPLACE GIT REPOSITORY GIT_REPO
  API_INTEGRATION = GITHUB_API_INTEGRATION
  GIT_CREDENTIALS = GITHUB_SECRET
  ORIGIN = $GITHUB_REPO_ORIGIN;

### Load Weather

Connect to the "Weather Source LLC: frostbyte" feed from Weather Source in the Snowflake Data Marketplace by following these steps:

    -> Snowsight Home Button
         -> Marketplace
             -> Search: "Weather Source LLC: frostbyte" (and click on tile in results)
                 -> Click the blue "Get" button
                     -> Under "Options", adjust the Database name to read "FROSTBYTE_WEATHERSOURCE" (all capital letters)
                        -> Grant to "AccountAdmin"
    
That's it... we don't have to do anything from here to keep this data updated.
The provider will do that for us and data sharing means we are always seeing
whatever they they have published.

In [None]:
-- Deploy external notebooks
 

EXECUTE IMMEDIATE FROM @GITHUB_REPO/branches/main/deploy_notebooks.sql
    USING (
        notebook => 'load_excel_files',
        source_wh => 'TEST_WH',
        source_db => 'data_engineering_with_snowflake_notebooks',
        source_schema => 'INTEGRATIONS',
        source_repo => 'data_engineering_with_snowflake_notebooks',
        source_branch => 'DEV',
        source_directory =>'notebooks',
        target_db => 'data_engineering_with_snowflake_notebooks',
        target_schema => 'DEV', 
        source_db => 'data_engineering_with_snowflake_notebooks'            
        );


In [None]:
daily_city_metrics_upsert = """

import time
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F

# Define daily_city_metrics_upsert
def main(session, schema):

    table ='DAILY_CITY_METRICS'

    # Temporarily increase warehouse size
    session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()
 
    
    # Get data
    # Define the tables
    order_detail_df = session.table("ORDER_DETAIL")
    history_day_df = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
    location_df = session.table("LOCATION")

    # Join the tables
    daily_city_metrics_stg_df = order_detail_df.join(location_df, order_detail_df['LOCATION_ID'] == location_df['LOCATION_ID'])
    daily_city_metrics_stg_df = daily_city_metrics_stg_df.join(history_day_df, ( \
            F.builtin("DATE")(daily_city_metrics_stg_df['ORDER_TS']) == history_day_df['DATE_VALID_STD']) \
            & (location_df['ISO_COUNTRY_CODE'] == history_day_df['COUNTRY']) \
            & (location_df['CITY'] == history_day_df['CITY_NAME']))

    # Aggregate the data
    daily_city_metrics_stg_df = daily_city_metrics_stg_df \
                            .group_by(\
                                F.col('DATE_VALID_STD'), \
                                F.col('CITY_NAME'), \
                                F.col('ISO_COUNTRY_CODE')) \
                            .agg( \
                                F.sum('PRICE').alias('DAILY_SALES_SUM'), \
                                F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \
                                F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \
                            ) \
                            .select(\
                                F.col("DATE_VALID_STD").alias("DATE"), \
                                F.col("CITY_NAME"), \
                                F.col("ISO_COUNTRY_CODE").alias("COUNTRY_DESC"), \
                                F.builtin("ZEROIFNULL")(F.col("DAILY_SALES_SUM")).alias("DAILY_SALES"), \
                                F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \
                                F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \
                            )

    # Check if table exists
    exists = session.sql(f"SELECT EXISTS ( \
                        SELECT * \
                        FROM INFORMATION_SCHEMA.TABLES \
                        WHERE TABLE_SCHEMA = '{schema}' \
                        AND TABLE_NAME = '{table}') \
                        AS TABLE_EXISTS").collect()[0]['TABLE_EXISTS']
    
    # Upsert data
    if not exists:
        daily_city_metrics_stg_df.write.mode("overwrite").save_as_table(table)
    else:

        cols_to_update = {c: daily_city_metrics_stg_df[c] for c in daily_city_metrics_stg_df.schema.names}
        metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
        records = {**cols_to_update, **metadata_col_to_update}
        daily_city_metrics_df = session.table('ANALYTICS.DAILY_CITY_METRICS')
        daily_city_metrics_df.merge(daily_city_metrics_stg_df, \
                                    (daily_city_metrics_df['DATE'] == daily_city_metrics_stg_df['DATE']) \
                                    & (daily_city_metrics_df['CITY_NAME'] == daily_city_metrics_stg_df['CITY_NAME']) \
                                    & (daily_city_metrics_df['COUNTRY_DESC'] == daily_city_metrics_stg_df['COUNTRY_DESC']), \
                                    [F.when_matched().update(records), F.when_not_matched().insert(records)])

    # Return warehouse to original size
    session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XSMALL").collect()
"""

In [None]:
-- Create DAILY_CITY_METRICS_UPSERT_SP
CREATE OR REPLACE PROCEDURE DAILY_CITY_METRICS_UPSERT_SP()
    RETURNS string
    LANGUAGE PYTHON
    RUNTIME_VERSION=3.9
    PACKAGES=('snowflake-snowpark-python','toml')
    HANDLER = 'main'
    AS $$
        {{daily_city_metrics_upsert}}
    $$;

In [None]:

warehouse_name = 'TEST_WH'
database_name = 'data_engineering_with_snowflake_notebooks'
schema_name = env
schema = root.databases[database_name].schemas[schema_name]
dag_op = DAGOperation(schema)


# Create the DAG
dag_obj = DAG(
    name = 'DAG',
    schedule = timedelta(days=1), 
    warehouse = warehouse_name
)

# set load_excel_files params
tables = ['order_detail', 'location']
task_params = []
for tname in tables:
    params = {
        'source_schema': 'INTEGRATIONS',
        'source_table': 'FROSTBYTE_RAW_STAGE',
        'source_directory': 'intro',
        'source_file': f"{tname}.xlsx",
        'source_worksheet': tname,

        'target_schema': 'INTEGRATIONS',
        'target_table': tname,
    }
    task_params.append(params)

# Define the DAG
with dag_obj as dag:
    order_detail_excel_task= DAGTask(
        name = "order_detail_excel_DAGTASK", 
        definition=f'''EXECUTE NOTEBOOK "{database_name}"."{schema_name}"."load_excel_files"()''',
        args=[session, task_params[0]],
        warehouse=warehouse_name)
    location_excel_task= DAGTask(
        name="location_excel_DAGTASK", 
        definition=f'''EXECUTE NOTEBOOK "{database_name}"."{schema_name}"."load_excel_files"()''',
        args=[session, task_params[1]],
        warehouse=warehouse_name)
    dailty_city_metrics_upsert_sp_dagtask = DAGTask(
        name="dailty_city_metrics_upsert_sp_dagtask", 
        definition=f'''CALL "{database_name}"."{schema_name}"."DAILY_CITY_METRICS_UPSERT_SP"()''', 
        args=[session, schema_name],
        warehouse=warehouse_name)

    # Define task precedence
    order_detail_excel_task >> dailty_city_metrics_upsert_sp_dagtask
    location_excel_task >> dailty_city_metrics_upsert_sp_dagtask

In [None]:
# Deploy the DAG in Snowflake
dag_op.deploy(dag, mode="orreplace")

In [None]:
# View all dags for schema
dag_iter = dag_op.iter_dags()
for dag_name in dag_iter:
    print(dag_name)

In [None]:
# Trigger dag to run once
dag_op.run(dag)

In [None]:
# View completed runs past hour
dag_op.get_complete_dag_runs(dag)

In [None]:
# View current runs and next scheduled
dag_op.get_current_dag_runs(dag)

In [None]:
# View all DAG in Snowflake
dag_run = DAGRun()

In [None]:
# Drop the DAG in Snowflake
dag_op.drop(dag)

In [None]:
-- View Logs

SELECT TOP 100 *
--   RECORD['severity_text'] AS SEVERITY,
--   VALUE AS MESSAGE
FROM
  data_engineering_with_snowflake_notebooks.INTEGRATIONS.EVENTS
-- WHERE 1 = 1
--   AND SCOPE['name'] = 'demo_logger'
--   AND RECORD_TYPE = 'LOG';

In [None]:
-- Teardown

DROP DATABASE data_engineering_with_snowflake_notebooks;


