# Snowpark For Python - 광고 지출 및 ROI 예측

### 목적

이 세션에서는 Snowpark for Python 및 scikit-learn을 사용하여 검색, 동영상, 소셜 미디어, 이메일 등 여러 채널에 걸쳐 다양한 광고 비용 예산의 미래 ROI (투자 수익률)를 예측하는 선형 회귀 모델 훈련합니다.

이 노트북에서는 다음을 수행합니다.

* Session 객체를 만들고 Snowflake에 안전하게 연결
* Snowflake 테이블에서 Snowpark DataFrame으로 데이터를로드합니다.
* Snowpark DataFrame에서 검색 데이터 분석(EDA) 수행
* 데이터 세트 피벗과 결합
* Python 저장 프로 시저를 만들어 모델 교육 코드를 Snowflake에 배포합니다.
* 사용자 입력을 기반으로 새 데이터 포인트를 추론하기 위한 Python 스칼라 및 벡터화된 사용자 정의 함수(UDF)를 만듭니다.
    * *참고: Scalar UDF는 Streamlit 앱에서 호출됩니다. [Snowpark_Streamlit_Revenue_Prediction.py](Snowpark_Streamlit_Revenue_Prediction.py)*를 참조하십시오.
* Snowflake 작업을 생성하여 데이터 파이프라인 및 모델(재) 교육을 자동화합니다.

### 전제 조건

  - ACCOUNTADMIN 역할이 있는 Snowflake 계정
    - 단일 브라우저 탭에서 계정(ORGADMIN 권한이 있는 역할)에서 만든 관리자 자격 증명을 사용하여 Snowflake 평가판 계정(https://app.snowflake.com/)에 로그인합니다. 워크숍 중에는 이 탭을 열어 두십시오.
    - 왼쪽 패널에서 **결제**를 클릭합니다.
    - 이용약관 및 청구(https://app.snowflake.com/terms-and-billing)를 클릭합니다.
    - 워크숍을 계속하려면 약관을 읽고 동의하세요.
  - 파이썬 3.8
  - Conda 환경을 만들고 활성화(또는 Python 3.8이 있는 다른 Python 환경 사용)
    - conda create --name Snowpark -c https://repo.anaconda.com/pkgs/snowflake python=3.8
    - conda는 스노우 파크를 활성화합니다.
  - Conda 환경에 Python, Streamlit 및 기타 라이브러리에 대한 Snowpark 설치
    - conda install -c https://repo.anaconda.com/pkgs/snowflake Snowflake-snowpark-python pandas 노트북 scikit-learn 캐시 도구 streamlit
  - 데이터 준비
    - 여기에 설명 된 단계에 따라 필요한 테이블을 만들고 데이터를로드합니다. - https://github.com/Snowflake-Labs/snowpark-python-demos/tree/main/Advertising-Spend- ROI-Prediction#setup
  - [connection.json](connection.json)을 Snowflake 계정 세부 정보 및 자격 증명으로 업데이트합니다.
    - 참고 : account 매개 변수는 계정 식별자 (https://docs.snowflake.com/en/user-guide/admin-account-identifier.html)를 지정하고 _snowflakecomputing.com_ 도메인 이름은 포함하지 마십시오. 제발. Snowflake는 연결을 만들 때 자동으로 추가합니다.

_댓글과 의견은 dash.desai@snowflake.com으로 문의하십시오. [Twitter] (https://twitter.com/iamontheinet)_를 팔로우하십시오.

<div style='text-align: center'>
    <img src="assets/snowpark.png" alt="Snowpark" style="width: 75%;"/>
</div>

### Import Libraries

In [1]:
import json
import logging

import pandas as pd
from snowflake.snowpark.functions import (array_construct, call_udf, col, lit,
                                          month, sum, udf, year)
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import (DateType, FloatType, IntegerType,
                                      StringType, StructField, StructType,
                                      Variant)
from snowflake.snowpark.version import VERSION

logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

### Snowflake에 대한 안전한 연결 설정

Snowpark API를 사용하면 Snowflake와 Notebook 간에 안전한 연결을 빠르고 쉽게 설정할 수 있습니다.

 *Connection options: Username/Password, MFA, OAuth, Okta, SSO*

In [4]:
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

Create objects to use for this demonstration.

In [5]:
session.sql(
    """CREATE WAREHOUSE IF NOT EXISTS SNOWPARK_DEMO_WH 
           WITH WAREHOUSE_SIZE = 'MEDIUM' 
                WAREHOUSE_TYPE = 'STANDARD' 
                AUTO_SUSPEND = 60 
                AUTO_RESUME = TRUE 
                INITIALLY_SUSPENDED = TRUE;"""
).collect()
session.sql("CREATE DATABASE IF NOT EXISTS SNOWPARK_ROI_DEMO;").collect()
session.sql("DROP SCHEMA IF EXISTS SNOWPARK_ROI_DEMO.PUBLIC;").collect()
session.sql("CREATE SCHEMA IF NOT EXISTS SNOWPARK_ROI_DEMO.AD_DATA;").collect()
session.sql(
    """CREATE FILE FORMAT IF NOT EXISTS SNOWPARK_ROI_DEMO.AD_DATA.CSVFORMAT 
           SKIP_HEADER = 1 
           TYPE = 'CSV';"""
).collect()
session.sql(
    """CREATE STAGE IF NOT EXISTS SNOWPARK_ROI_DEMO.AD_DATA.CAMPAIGN_DATA_STAGE 
           FILE_FORMAT = SNOWPARK_ROI_DEMO.AD_DATA.CSVFORMAT  
           URL = 's3://sfquickstarts/Summit 2022 Keynote Demo/campaign_spend/';"""
).collect()
session.sql(
    """CREATE STAGE IF NOT EXISTS SNOWPARK_ROI_DEMO.AD_DATA.MONTHLY_REVENUE_DATA_STAGE 
           FILE_FORMAT = SNOWPARK_ROI_DEMO.AD_DATA.CSVFORMAT  
           URL = 's3://sfquickstarts/Summit 2022 Keynote Demo/monthly_revenue/';"""
).collect()
session.sql("CREATE OR REPLACE STAGE SNOWPARK_ROI_DEMO.AD_DATA.PYTHON_MODELS").collect()
session.sql("CREATE OR REPLACE STAGE SNOWPARK_ROI_DEMO.AD_DATA.PYTHON_CODE").collect()

[Row(status='Stage area PYTHON_CODE successfully created.')]

Change the current context.

In [6]:
session.use_warehouse("SNOWPARK_DEMO_WH")
session.use_database("SNOWPARK_ROI_DEMO")
session.use_schema("AD_DATA")

In [7]:
snowflake_environment = session.sql('select current_user(), current_role(), current_database(), current_schema(), current_version(), current_warehouse()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(snowflake_environment[0][1]))
print('Database                    : {}'.format(snowflake_environment[0][2]))
print('Schema                      : {}'.format(snowflake_environment[0][3]))
print('Warehouse                   : {}'.format(snowflake_environment[0][5]))
print('Snowflake version           : {}'.format(snowflake_environment[0][4]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : VKSK0258
Role                        : ACCOUNTADMIN
Database                    : SNOWPARK_ROI_DEMO
Schema                      : AD_DATA
Warehouse                   : SNOWPARK_DEMO_WH
Snowflake version           : 7.21.1
Snowpark for Python version : 0.11.0


View the files in the external stage.

In [8]:
session.sql("LS @CAMPAIGN_DATA_STAGE").collect()

[Row(name='s3://sfquickstarts/Summit 2022 Keynote Demo/campaign_spend/campaign_spend.csv', size=13684943, md5='1d87f70421662a7666d3918b16b81daa', last_modified='Fri, 5 Aug 2022 20:22:18 GMT')]

### 집계 된 캠페인 지출 데이터를 Snowflake 테이블에서 Snowpark DataFrame으로로드합니다.

먼저 캠페인 지출 데이터를 로드합니다. 이 표에는 검색 엔진, 소셜 미디어, 이메일, 동영상과 같은 전체 디지털 광고 채널의 일일 지출을 보여주기 위해 집계된 광고 클릭 데이터가 포함되어 있습니다.

NOTE: Ways to load data in a Snowpark Dataframe
* session.table("db.schema.table")
* session.sql("select col1, col2... from tableName")
* session.read.parquet("@stageName/path/to/file")
* session.create_dataframe([1,2,3], schema=["col1"])

TIP: For more information on Snowpark DataFrames, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.html#snowflake.snowpark.DataFrame).


Query and preview the CSV file in the stage.

In [9]:
df = session.sql(
    """SELECT $1::VARCHAR(60) AS CAMPAIGN, 
              $2::VARCHAR(60) AS CHANNEL, 
              $3::DATE AS DATE, 
              $4::NUMBER(38, 0) AS TOTAL_CLICKS, 
              $5::NUMBER(38, 0) AS TOTAL_COST, 
              $6::NUMBER(38, 0) AS ADS_SERVED 
       FROM @CAMPAIGN_DATA_STAGE""")

df.show()

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

Write this DataFrame to a Snowflake table named `CAMPAIGN_SPEND`. 

In [10]:
df.write.save_as_table("CAMPAIGN_SPEND", mode="overwrite")

In [11]:
snow_df_spend = session.table('campaign_spend')
snow_df_spend.queries

{'queries': ['SELECT  *  FROM (campaign_spend)'], 'post_actions': []}

<div style='text-align: center'>
    <img src="assets/snowpark_python_api.png" alt="Snowpark" style="width: 75%;"/>
</div>

In [12]:
# Action sends the DF SQL for execution
# Note: history object provides the query ID which can be helpful for debugging as well as the SQL query executed on the server
with session.query_history() as history:
    snow_df_spend.show()
history.queries

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

[QueryRecord(query_id='01ad381f-0000-339b-0000-65f90001557e', sql_text='SELECT  *  FROM campaign_spend LIMIT 10')]

### 월별 채널당 총 지출

_group_by()_ 및 _agg()_ Snowpark DataFrame 함수를 사용하여 채널당 연간/월 총 비용을 볼 수 있도록 데이터를 변환해 보겠습니다.

TIP: For a full list of functions, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.functions.html#module-snowflake.snowpark.functions).

In [13]:
# Stats per Month per Channel
snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
    with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

snow_df_spend_per_channel.show(10)

---------------------------------------------------
|"YEAR"  |"MONTH"  |"CHANNEL"      |"TOTAL_COST"  |
---------------------------------------------------
|2012    |5        |search_engine  |516431        |
|2012    |5        |video          |516729        |
|2012    |5        |email          |517208        |
|2012    |5        |social_media   |517618        |
|2012    |6        |video          |501098        |
|2012    |6        |search_engine  |506497        |
|2012    |6        |social_media   |504679        |
|2012    |6        |email          |501947        |
|2012    |7        |search_engine  |522780        |
|2012    |7        |email          |518405        |
---------------------------------------------------



### 채널에서 피벗

  _pivot()_ 및 _sum()_ Snowpark DataFrame 함수를 사용하여 **각 행이 모든 채널의 총 비용**을 연/월로 나타내도록 캠페인 지출 데이터를 추가로 변환해 보겠습니다. 이 변환을 통해 모델 교육을 위한 단일 테이블에 입력 기능과 대상 변수가 있도록 수익 테이블과 조인할 수 있습니다.

 TIP: For a full list of functions, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.functions.html#module-snowflake.snowpark.functions).

In [14]:
snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month = snow_df_spend_per_month.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
snow_df_spend_per_month.show()

---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |
|2012    |6        |506497           |504679          |501098   |501947   |
|2012    |7        |522780           |521395          |522762   |518405   |
|2012    |8        |519959           |520537          |520685   |521584   |
|2012    |9        |507211           |507404          |511364   |507363   |
|2012    |10       |518942           |520863          |522768   |519950   |
|2012    |11       |505715           |505221          |505292   |503748   |
|2012    |12       |520148           |520711          |521427   |520724   |
|2013    |1        |522151           |518635          |520583   |521167   |
|2013    |2        |467736           |474679          |469856   |469784   |
------------

### 월별 총 수익

이제 수익 테이블을 로드하고 _group_by_ 및 _agg()_ 함수를 사용하여 데이터를 연간/월별 수익으로 변환해 보겠습니다.

In [15]:
df = session.sql(
    """SELECT $1::NUMBER(38, 0) AS YEAR, 
              $2::NUMBER(38, 0) AS MONTH, 
              $3::FLOAT AS REVENUE
       FROM @MONTHLY_REVENUE_DATA_STAGE""")

df.write.save_as_table("MONTHLY_REVENUE", mode="overwrite")

In [16]:
snow_df_revenue = session.table('monthly_revenue')
snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')
snow_df_revenue_per_month.show()

---------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"   |
---------------------------------
|2012    |5        |3264300.11  |
|2012    |6        |3208482.33  |
|2012    |7        |3311966.98  |
|2012    |8        |3311752.81  |
|2012    |9        |3208563.06  |
|2012    |10       |3334028.46  |
|2012    |11       |3185894.64  |
|2012    |12       |3334570.96  |
|2013    |1        |3316455.44  |
|2013    |2        |2995042.21  |
---------------------------------



### 월별 총 지출 및 총 수익 가입

다음으로 **이 수익 데이터를 변환된 캠페인 지출 데이터와 결합**하여 입력 기능(예: 채널당 비용) 및 대상 변수(예: 수익)를 모델 학습을 위한 단일 테이블에 로드할 수 있습니다.

In [17]:
snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])
snow_df_spend_and_revenue_per_month.show()

----------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
----------------------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |3264300.11  |
|2012    |6        |506497           |504679          |501098   |501947   |3208482.33  |
|2012    |7        |522780           |521395          |522762   |518405   |3311966.98  |
|2012    |8        |519959           |520537          |520685   |521584   |3311752.81  |
|2012    |9        |507211           |507404          |511364   |507363   |3208563.06  |
|2012    |10       |518942           |520863          |522768   |519950   |3334028.46  |
|2012    |11       |505715           |505221          |505292   |503748   |3185894.64  |
|2012    |12       |520148           |520711          |521427   |520724   |3334570.96  |
|2013    |1        |5

### >>>>>>>>>> *Snowpark DataFrame 쿼리 및 실행 계획 검토* <<<<<<<<<<

Snowpark는 _explain()_ Snowpark DataFrame 함수를 사용하여 DataFrame 쿼리 및 실행 계획을 살펴보는 것이 정말 편리합니다.

In [18]:
snow_df_spend_and_revenue_per_month.explain()

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM (( SELECT "YEAR" AS "YEAR", "MONTH" AS "MONTH", "SEARCH_ENGINE" AS "SEARCH_ENGINE", "SOCIAL_MEDIA" AS "SOCIAL_MEDIA", "VIDEO" AS "VIDEO", "EMAIL" AS "EMAIL" FROM ( SELECT "YEAR", "MONTH", "'search_engine'" AS "SEARCH_ENGINE", "'social_media'" AS "SOCIAL_MEDIA", "'video'" AS "VIDEO", "'email'" AS "EMAIL" FROM ( SELECT  *  FROM ( SELECT  *  FROM ( SELECT  *  FROM ( SELECT "YEAR(DATE)" AS "YEAR", "MONTH(DATE)" AS "MONTH", "CHANNEL", "TOTAL_COST" FROM ( SELECT year("DATE") AS "YEAR(DATE)", month("DATE") AS "MONTH(DATE)", "CHANNEL", sum("TOTAL_COST") AS "TOTAL_COST" FROM ( SELECT  *  FROM campaign_spend) GROUP BY year("DATE"), month("DATE"), "CHANNEL")) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST) PIVOT (sum("TOTAL_COST") FOR "CHANNEL" IN ('search_engine', 'social_media', 'video', 'email'))) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST))) AS SNOWPARK_TEMP_TABLE_QAMZ3F2OMD INNER JOIN ( SELECT 

### Snowflake의 모델 훈련

#### 기능 및 타겟

이 시점에서 우리는 모델 교육을 위해 기능과 대상을 저장하기 위해 다음 작업을 수행할 준비가 되었습니다.

* 누락된 값이 있는 행 삭제
* 모델링에 필요하지 않은 열 제외
* 기능을 MARKETING_BUDGETS_FEATURES라는 Snowflake 테이블에 저장

TIP: To see how to handle missing values in Snowpark Python, refer to this [blog](https://medium.com/snowflake/handling-missing-values-with-snowpark-for-python-part-1-4af4285d24e6).

In [19]:
# Delete rows with missing values
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')
snow_df_spend_and_revenue_per_month.show()

---------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
---------------------------------------------------------------------
|516431           |517618          |516729   |517208   |3264300.11  |
|506497           |504679          |501098   |501947   |3208482.33  |
|522780           |521395          |522762   |518405   |3311966.98  |
|519959           |520537          |520685   |521584   |3311752.81  |
|507211           |507404          |511364   |507363   |3208563.06  |
|518942           |520863          |522768   |519950   |3334028.46  |
|505715           |505221          |505292   |503748   |3185894.64  |
|520148           |520711          |521427   |520724   |3334570.96  |
|522151           |518635          |520583   |521167   |3316455.44  |
|467736           |474679          |469856   |469784   |2995042.21  |
---------------------------------------------------------------------



#### scikit-learn을 사용하여 선형 회귀 모델을 교육하는 Python 함수

**scikit-learn 및 이미** [Snowflake Anaconda 채널](https://repo.anaconda.com/pkgs/snowflake/)에 포함되어 있어 서버에서 사용할 수 있는 기타 패키지를 사용하는 Python 함수를 만들어 보겠습니다. Snowflake에서 실행되는 저장 프로시저로 Python 함수를 실행할 때 측.

이 함수는 다음을 매개변수로 사용합니다.

* _session_: 눈송이 세션 객체.
* _features_table_: 기능 및 대상 변수를 보유하는 테이블의 이름.
* _number_of_folds_: GridSearchCV에서 사용되는 교차 유효성 검사 접기의 수입니다.
* _polynomial_features_degress_: 전처리 단계로서의 PolynomialFeatures.
* _train_accuracy_threshold_: 기차 데이터 세트의 정확도 임계값입니다. 이 값은 모델을 저장해야 하는지 여부를 결정하는 데 사용됩니다.
* _test_accuracy_threshold_: 테스트 데이터 세트의 정확도 임계값입니다. 이 값은 모델을 저장해야 하는지 여부를 결정하는 데 사용됩니다.
* _save_model_: 정확도 임계값이 충족되는 경우 모델을 저장해야 하는지 여부를 결정하는 부울입니다.

TIP: For large datasets, Snowflake offers [Snowpark-optimized Warehouses](https://docs.snowflake.com/en/user-guide/warehouses-snowpark-optimized.html) which are in Public Preview as of Nov 2022).



In [20]:
def train_revenue_prediction_model(
    session: Session, 
    features_table: str, 
    number_of_folds: int, 
    polynomial_features_degrees: int, 
    train_accuracy_threshold: float, 
    test_accuracy_threshold: float, 
    save_model: bool) -> Variant:
    
    import os

    from joblib import dump
    from sklearn.compose import ColumnTransformer
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import GridSearchCV, train_test_split
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import PolynomialFeatures, StandardScaler

    # Load features
    df = session.table(features_table).to_pandas()

    # Preprocess the Numeric columns
    # We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns
    # NOTE: High degrees can cause overfitting.
    numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = polynomial_features_degrees)),('scaler', StandardScaler())])

    # Combine the preprocessed step together using the Column Transformer module
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features)])

    # The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression())])
    parameteres = {}

    X = df.drop('REVENUE', axis = 1)
    y = df['REVENUE']

    # Split dataset into training and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)

    # Use GridSearch to find the best fitting model based on number_of_folds folds
    model = GridSearchCV(pipeline, param_grid=parameteres, cv=number_of_folds)

    model.fit(X_train, y_train)
    train_r2_score = model.score(X_train, y_train)
    test_r2_score = model.score(X_test, y_test)

    model_saved = False

    if save_model:
        if train_r2_score >= train_accuracy_threshold and test_r2_score >= test_accuracy_threshold:
            # Upload trained model to a stage
            model_output_dir = '/tmp'
            model_file = os.path.join(model_output_dir, 'model.joblib')
            dump(model, model_file)
            session.file.put(model_file,"@PYTHON_MODELS",overwrite=True)
            model_saved = True

    # Return model R2 score on train and test data
    return {"R2 score on Train": train_r2_score,
            "R2 threshold on Train": train_accuracy_threshold,
            "R2 score on Test": test_r2_score,
            "R2 threshold on Test": test_accuracy_threshold,
            "Model saved": model_saved}

#### Snowflake에 저장 프로시저로 배포하기 전에 Python 함수 테스트

테스트 모드에 있으므로 모델이 아직 저장되지 않도록 _save_model = False_로 설정합니다.

In [21]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = False

train_revenue_prediction_model(
    session,
    "MARKETING_BUDGETS_FEATURES",
    cross_validaton_folds,
    polynomial_features_degrees,
    train_accuracy_threshold,
    test_accuracy_threshold,
    save_model)

{'R2 score on Train': 0.9954552822793986,
 'R2 threshold on Train': 0.85,
 'R2 score on Test': 0.8817971097765244,
 'R2 threshold on Test': 0.85,
 'Model saved': False}

### Snowflake에 모델 학습 코드를 배포하기 위한 저장 프로시저 만들기

테스트가 완료되고 모델에 만족한다고 가정하고 패키지(_snowflake-snowpark-python, scikit-learn 및 joblib_)를 제공하여 **register the model training Python function as a Snowpark Python Stored Procedure** 등록 합니다. 실행 중에 필요하고 사용합니다.

TIP: For more information on Snowpark Python Stored Procedures, refer to the [docs](https://docs.snowflake.com/en/sql-reference/stored-procedures-python.html).

In [22]:
session.sproc.register(
    func=train_revenue_prediction_model,
    name="train_revenue_prediction_model",
    packages=['snowflake-snowpark-python','scikit-learn','joblib'],
    is_permanent=True,
    stage_location="@PYTHON_CODE",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x1454c69ce20>

### >>>>>>>>>> *Examine Query History in Snowsight* <<<<<<<<<<

### Execute Stored Procedure to train model and deploy it on Snowflake

이제 모델을 훈련하고 Snowflake 단계에 저장할 준비가 되었으므로 _save_model = True_로 설정하고 _session.call()_ 함수를 사용하여 저장 프로시저를 실행/실행합니다.

In [23]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = True

print(session.call('train_revenue_prediction_model',
                    'MARKETING_BUDGETS_FEATURES',
                    cross_validaton_folds,
                    polynomial_features_degrees,
                    train_accuracy_threshold,
                    test_accuracy_threshold,
                    save_model))

{
  "Model saved": true,
  "R2 score on Test": 0.8817971097765288,
  "R2 score on Train": 0.9954552822793986,
  "R2 threshold on Test": 0.85,
  "R2 threshold on Train": 0.85
}


### >>>>>>>>>> *Examine Query History in Snowsight* <<<<<<<<<<

### Create Scalar User-Defined Function (UDF) for inference

이제 추론을 위해 이 모델을 배포하기 위해 **Snowpark Python UDF를 생성 및 등록하고 훈련된 모델을 종속성으로 추가**해 보겠습니다. 일단 등록되면 새로운 예측을 얻는 것은 데이터를 전달하여 함수를 호출하는 것만큼 간단합니다.

*참고: 스칼라 UDF는 단일 행/데이터 포인트 집합에서 작동하며 실시간 온라인 추론에 적합합니다. 그리고 이 UDF는 Streamlit 앱에서 호출됩니다.. See [Snowpark_Streamlit_Revenue_Prediction.py](Snowpark_Streamlit_Revenue_Prediction.py)*

TIP: For more information on Snowpark Python User-Defined Functions, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/python/creating-udfs.html).

In [24]:
session.clear_imports()
session.clear_packages()

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import('@PYTHON_MODELS/model.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn==1.1.1')

@udf(name='predict_roi',session=session,replace=True,is_permanent=True,stage_location='@PYTHON_CODE')
def predict_roi(budget_allocations: list) -> float:
    import sys

    import pandas as pd
    import sklearn
    from joblib import load

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    model_file = import_dir + 'model.joblib.gz'
    model = load(model_file)
            
    features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    df = pd.DataFrame([budget_allocations], columns=features)
    roi = abs(model.predict(df)[0])
    return roi

<div style='text-align: center'>
    <img src="assets/snowpark_python_udfs.png" alt="Snowpark" style="width: 75%;"/>
</div>

### 새 데이터에 대한 추론을 위해 Scalar 사용자 정의 함수(UDF) 호출

UDF가 등록되면 _call_udf()_ Snowpark Python 함수를 호출하고 새 데이터 포인트를 전달하는 것만큼 간단하게 새로운 예측을 얻을 수 있습니다.

몇 가지 샘플 데이터로 SnowPark DataFrame을 만들고 UDF를 호출하여 새로운 예측을 얻도록 하겠습니다.

 *NOTE: This UDF is also called from the Streamlit App. See [Snowpark_Streamlit_Revenue_Prediction.py](Snowpark_Streamlit_Revenue_Prediction.py)*

In [25]:
test_df = session.create_dataframe([[250000,250000,200000,450000],[500000,500000,500000,500000],[8500,9500,2000,500]], 
                                    schema=['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL'])
test_df.select(
    'SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL', 
    call_udf("predict_roi", 
    array_construct(col("SEARCH_ENGINE"), col("SOCIAL_MEDIA"), col("VIDEO"), col("EMAIL"))).as_("PREDICTED_ROI")).show()

-----------------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_ROI"     |
-----------------------------------------------------------------------------
|250000           |250000          |200000   |450000   |4072491.441724832   |
|500000           |500000          |500000   |500000   |3179613.166194174   |
|8500             |9500            |2000     |500      |189866.83304576762  |
-----------------------------------------------------------------------------



### 추론을 위해 Batch API를 사용하여 벡터화된 사용자 정의 함수(UDF) 생성

여기서는 Python UDF Batch API를 활용하여 Pandas Dataframe을 입력으로 사용하는 **벡터화된** UDF를 생성합니다. 이것은 UDF에 대한 각 호출이 하나의 행을 입력으로 가져오는 Scalar UDF와 비교하여 행 세트/일괄 처리를 수신함을 의미합니다.

먼저 **cachetools**를 사용하는 도우미 함수 _load_model()_을 생성하여 모델을 한 번만 로드한 다음 추론을 수행하는 _batch_predict_roi()_ 함수를 로드합니다.

_참고: 벡터화된 UDF는 배치 모드에서 오프라인 추론에 적합합니다._

Scalar UDF보다 Batch API를 사용하는 이점:

* Python 코드가 행 배치에서 효율적으로 작동하는 경우 성능 향상 가능성
* Pandas DataFrames 또는 Pandas 배열에서 작동하는 라이브러리를 호출하는 경우 필요한 변환 로직 감소

TIP: For more information on Snowpark Python UDF Batch API, refer to the [docs](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-batch.html#getting-started-with-the-batch-api).

In [26]:
session.clear_imports()
session.clear_packages()

import cachetools
from snowflake.snowpark.types import PandasDataFrame, PandasSeries

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import('@PYTHON_MODELS/model.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn','cachetools')

@cachetools.cached(cache={})
def load_model(filename):
    import os
    import sys

    import joblib

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

@udf(name='batch_predict_roi',session=session,replace=True,is_permanent=True,stage_location='@PYTHON_CODE')
def batch_predict_roi(budget_allocations_df: PandasDataFrame[int, int, int, int]) -> PandasSeries[float]:
    import sklearn
    budget_allocations_df.columns = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    model = load_model('model.joblib.gz')
    return abs(model.predict(budget_allocations_df))

### 새 데이터에 대한 추론을 위해 Batch API를 사용하여 벡터화된 사용자 정의 함수(UDF) 호출

Batch API를 사용하는 경우:

* Python UDF를 사용하여 쿼리를 작성하는 방법을 변경할 필요가 없습니다. 모든 일괄 처리는 자체 코드가 아닌 UDF 프레임워크에서 처리됩니다.
* 참고: 비배치/스칼라 API와 마찬가지로 처리기 코드의 어떤 인스턴스가 입력의 어떤 배치를 볼 것인지 보장할 수 없습니다.

In [27]:
test_df.select(
    'SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL', 
    call_udf("batch_predict_roi", 
    col("SEARCH_ENGINE"), col("SOCIAL_MEDIA"), col("VIDEO"), col("EMAIL")).as_("PREDICTED_ROI")).show()

-----------------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_ROI"     |
-----------------------------------------------------------------------------
|8500             |9500            |2000     |500      |189866.83304576762  |
|250000           |250000          |200000   |450000   |4072491.441724832   |
|500000           |500000          |500000   |500000   |3179613.166194174   |
-----------------------------------------------------------------------------



**Snowpark Stored Procedures vs User-Defined Functions**

_일반적으로 각 행/배치가 독립적으로 처리될 수 있는 방식으로 대규모 데이터 세트를 처리하는 경우 처리가 웨어하우스 전체에서 자동으로 병렬화/확장되므로 UDF가 항상 더 좋습니다. 예를 들어 훈련된 ML 모델이 이미 있고 수십억 개의 행에서 해당 모델을 사용하여 추론을 수행하는 경우입니다. 이 경우 각 행/배치를 독립적으로 계산할 수 있습니다._

_사용 사례에 전체 데이터 세트가 메모리에 있어야 하는 경우(예: ML 교육) 저장 프로시저를 사용하는 것이 좋습니다. 저장 프로시저는 단일 웨어하우스 노드에서 실행되는 Python 프로그램일 뿐입니다. (UDF를 사용하면 처리가 한 번에 한 배치씩 스트리밍 방식으로 수행되기 때문에 전체 데이터 세트를 메모리에 로드할 수 없습니다._

### Snowflake 작업을 사용하여 Data Pipeline 및 Model (re)Training 자동화

또한 선택적으로 Snowflake(서버리스 또는 사용자 관리) 작업을 생성하여 설정된 일정에 따라 모델의 데이터 파이프라인 및 (재)훈련을 자동화할 수 있습니다.

_참고: Snowpark Python API(SQL 대신)를 사용하여 작업을 생성하는 것은 로드맵에 있습니다. 계속 지켜봐! 또는 [Twitter]((https://twitter.com/iamontheinet)_)에서 저를 팔로우하여 누구보다 먼저 소식을 받아보세요 :)_

팁: 무엇보다도 Amazon Simple Notification Service(SNS)와 같은 클라우드 메시징 서비스를 사용하여 오류 알림(현재 Private Preview)에 대한 작업을 구성할 수도 있습니다. Snowflake 작업에 대한 자세한 내용은 [문서](https://docs.snowflake.com/en/user-guide/tasks-intro.html)를 참조하세요.

#### 데이터 파이프라인 및 기능 엔지니어링을 위한 Python 함수 만들기

In [28]:
def data_pipeline_feature_engineering(session: Session) -> str:

  # DATA TRANSFORMATIONS
  # Perform the following actions to transform the data

  # Load the campaign spend data
  snow_df_spend = session.table('campaign_spend')

  # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
  snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
      with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

  # Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
  snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
  snow_df_spend_per_month = snow_df_spend_per_month.select(
      col("YEAR"),
      col("MONTH"),
      col("'search_engine'").as_("SEARCH_ENGINE"),
      col("'social_media'").as_("SOCIAL_MEDIA"),
      col("'video'").as_("VIDEO"),
      col("'email'").as_("EMAIL")
  )

  # Load revenue table and transform the data into revenue per year/month using group_by and agg() functions
  snow_df_revenue = session.table('monthly_revenue')
  snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')

  # Join revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training
  snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])

  # SAVE FEATURES And TARGET
  # Perform the following actions to save features and target for model training

  # Delete rows with missing values
  snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

  # Exclude columns we don't need for modeling
  snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

  # Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
  snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')

  return "SUCCESS"

#### Snowflake에 데이터 파이프라인 기능 엔지니어링 코드를 배포하기 위한 저장 프로시저 생성

TIP: For more information on Snowpark Python Stored Procedures, refer to the [docs](https://docs.snowflake.com/en/sql-reference/stored-procedures-python.html).

In [29]:
session.sproc.register(
    func=data_pipeline_feature_engineering,
    name="data_pipeline_feature_engineering",
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location="@PYTHON_CODE",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x1454c69c9a0>

#### 저장 프로시저를 실행하여 Snowflake에 데이터 파이프라인 기능 엔지니어링 코드 배포

In [30]:
print(session.call('data_pipeline_feature_engineering'))

SUCCESS


####  Root/Parent Snowflake 만들기 작업: 데이터 파이프라인 및 기능 엔지니어링

In [31]:
create_data_pipeline_feature_engineering_task = """
CREATE OR REPLACE TASK data_pipeline_feature_engineering_task
    WAREHOUSE = 'SNOWPARK_DEMO_WH'
    SCHEDULE  = '1 MINUTE'
AS
    CALL data_pipeline_feature_engineering()
"""
session.sql(create_data_pipeline_feature_engineering_task).collect()

[Row(status='Task DATA_PIPELINE_FEATURE_ENGINEERING_TASK successfully created.')]

#### Create Child/Dependent Snowflake Task: Model training on Snowflake

In [32]:
create_model_training_task = """
CREATE OR REPLACE TASK model_training_task
    WAREHOUSE = 'SNOWPARK_DEMO_WH'
    AFTER data_pipeline_feature_engineering_task
AS
    CALL train_revenue_prediction_model('MARKETING_BUDGETS_FEATURES',10,2,0.85,0.85,True)
"""
session.sql(create_model_training_task).collect()

[Row(status='Task MODEL_TRAINING_TASK successfully created.')]

#### Resume Tasks

In [33]:
session.sql("alter task model_training_task resume").collect()
session.sql("alter task data_pipeline_feature_engineering_task resume").collect()

[Row(status='Statement executed successfully.')]

#### Cleanup Resources

In [34]:
session.sql("alter task data_pipeline_feature_engineering_task suspend").collect()
session.sql("alter task model_training_task suspend").collect()

[Row(status='Statement executed successfully.')]

_For comments and feedback, please reach out to dash.desai@snowflake.com | Follow on [Twitter](https://twitter.com/iamontheinet)_ 