# プロジェクト概要
Snowpark for Python、Snowpark ML、Streamlit を使用して、検索、動画、ソーシャルメディア、電子メールなどの複数のチャネルで変動する広告費予算の将来の ROI（Return On Investment）を予測する線形回帰モデルを学習するためのデータ分析とデータ準備タスクを実行します。

セッションの最後には、さまざまな広告費予算の ROI を視覚化するインタラクティブなウェブアプリケーションがデプロイされます。

Snowpark for Python および Snowpark ML は Amazon SageMaker Studio、Streamlit は Streamlit in Snowflake を用いて実行します。

### データエンジニアリング -- データ分析と準備
このノートブックでは、Snowpark for Python を使用した Snowflake でのデータエンジニアリングに焦点を当てます。

* Snowflake へのセキュアな接続の確立
* Snowflake テーブルから Snowpark DataFrames へのデータのロード
* Snowpark DataFrames での探索的データ分析の実行
* Snowpark DataFrames を使用した複数テーブルからのデータのピボットと結合
* Snowflake タスクを使用したデータ準備の自動化

### 環境準備
以下の情報でカーネルを起動し、下のセルのコマンドから必要なライブラリをインストールしてください。
* Image: PyTorch 1.13 Python 3.9 CPU Optimized
* Kernel: Python 3
* Instance: ml.t3.medium

In [None]:
# ライブラリのインストール（いくつかのライブラリインストールでエラーが起きても無視して進みます）
!pip install snowflake-snowpark-python==1.5.0 pandas notebook scikit-learn cachetools pyarrow==10.0.1 snowflake-ml-python==1.0.2

### ライブラリのインポート

In [None]:
# Snowpark for Python
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import month,year,col,sum
from snowflake.snowpark.version import VERSION

# Misc
import json
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

### Snowflake へのセキュアな接続の確立

Snowpark Python API を使用すると、Snowflake と Notebook 間のセキュアな接続を素早く簡単に確立できます。

TIP: [Session](https://docs.snowflake.com/developer-guide/snowpark/reference/python/session.html) オブジェクトについて

In [None]:
# Snowflake Session オブジェクトの作成
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('select current_user(), current_version()').collect()
snowpark_version = VERSION

# 環境の詳細（connections.json の編集内容が適切に反映されているか確認してください）
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

### 集計されたキャンペーン支出データを Snowflake テーブルから Snowpark DataFrame にロードする

まずキャンペーン支出データをロードしましょう。
このテーブルには、検索エンジン、ソーシャルメディア、Eメール、動画などのデジタル広告チャネルにおける日々の支出を示すために集計された広告クリックデータが含まれています。

*Note: Snowpark DataFrame にデータをロードするいくつかの方法があります*
* *session.sql("select col1, col2... from tableName")*
* *session.read.options({"field_delimiter": ",", "skip_header": 1}).schema(user_schema).csv("@mystage/testCSV.csv")*
* *session.read.parquet("@stageName/path/to/file")*
* *session.create_dataframe([1,2,3], schema=["col1"])*

TIP: [Snowpark DataFrames](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.html#snowflake.snowpark.DataFrame) について


In [None]:
snow_df_spend = session.table('campaign_spend')
snow_df_spend.queries

*show()、collect()、count()* のようなアクションは、サーバ上で実行される DataFrame SQL を送信します。

*Note: History オブジェクトは、サーバ上で実行された SQL クエリと同様に、デバッグに役立つクエリ ID を提供します。*

In [None]:
with session.query_history() as history:
    snow_df_spend.show(20)
history.queries

### 全チャネルの年/月間総支出額

_group_by()_ と _agg()_ Snowpark DataFrame 関数を使用して、チャネルごとの年間/月間総費用を見ることができるようにデータを変換してみましょう。

TIP: 関数のリストは [documentation](https://docs.snowflake.com/developer-guide/snowpark/reference/python/latest/index) を参照してください。

In [None]:
# Stats per Month per Channel
snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
    with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

snow_df_spend_per_channel.show(10)

### チャネルでピボット: 全チャネルの総支出額

さらに、_pivot()_ と _sum()_ Snowpark DataFrame 関数を使用して、**各行が 1 年/月あたりの全チャネル**の合計コストを表すように、キャンペーンの支出データを変換しましょう。
この変換により、収益テーブルと結合することができ、モデルトレーニングのために、入力フィーチャーとターゲット変数を 1 つのテーブルに持つことができます。

TIP: 関数のリストは [documentation](https://docs.snowflake.com/developer-guide/snowpark/reference/python/latest/index) を参照してください。

In [None]:
snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month = snow_df_spend_per_month.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
snow_df_spend_per_month.show()

### 変換されたデータを Snowflake テーブルに保存する

変換されたデータを Snowflake テーブル *SPEND_PER_MONTH* に保存してみましょう。

In [None]:
snow_df_spend_per_month.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

### オプション - 自動化: Snowflake タスクとしてキャンペーン支出データ変換を実行する

*Note: オプションとして、コードを Snowpark ストアドプロシージャとしてSnowflake にデプロイし、Snowflake タスクとして実行することで、これらの変換をすべて自動タスクとして実行することができます。*

TIP: [Stored Procedures](https://docs.snowflake.com/en/sql-reference/stored-procedures-python) および [Snowflake Tasks](https://docs.snowflake.com/en/sql-reference/sql/create-task) について


In [None]:
def campaign_spend_data_pipeline(session: Session) -> str:
  # DATA TRANSFORMATIONS
  # Perform the following actions to transform the data

  # Load the campaign spend data
  snow_df_spend_t = session.table('campaign_spend')

  # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
  snow_df_spend_per_channel_t = snow_df_spend_t.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
      with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

  # Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
  snow_df_spend_per_month_t = snow_df_spend_per_channel_t.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
  snow_df_spend_per_month_t = snow_df_spend_per_month_t.select(
      col("YEAR"),
      col("MONTH"),
      col("'search_engine'").as_("SEARCH_ENGINE"),
      col("'social_media'").as_("SOCIAL_MEDIA"),
      col("'video'").as_("VIDEO"),
      col("'email'").as_("EMAIL")
  )

  # Save transformed data
  snow_df_spend_per_month_t.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

# Register data pipelining function as a Stored Procedure so it can be run as a task
session.sproc.register(
  func=campaign_spend_data_pipeline,
  name="campaign_spend_data_pipeline",
  packages=['snowflake-snowpark-python'],
  is_permanent=True,
  stage_location="@sprocs",
  replace=True)

# WH は必要に応じて変える
campaign_spend_data_pipeline_task = """
CREATE OR REPLACE TASK campaign_spend_data_pipeline_task
    WAREHOUSE = 'SHARED_HOL_WH'
    SCHEDULE  = '3 MINUTE'
AS
    CALL campaign_spend_data_pipeline()
"""
session.sql(campaign_spend_data_pipeline_task).collect()
    

### 年/月間総収益

収益テーブルをロードし、*group_by()* と *agg()* 関数を使用して、データを年/月ごとの収益に変換してみましょう。

In [None]:
snow_df_revenue = session.table('monthly_revenue')
snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')
snow_df_revenue_per_month.show()

### 全チャネルにおける年/月間の総支出額と総収益を結合する

入力特徴（例：チャネルごとのコスト）とターゲット変数（例：収益）がモデルトレーニングのために単一のテーブルにロードできるように、次に、**この収益データを変換されたキャンペーン支出データ**と結合してみましょう。

In [None]:
snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])
snow_df_spend_and_revenue_per_month.show()

### >>>>>>>>>> *Snowpark DataFrameのクエリと実行プランの検証 <<<<<<<<<<

Snowpark では、_explain()_ Snowpark DataFrame 関数を使って、DataFrame のクエリと実行計画を見ることができます。

In [None]:
snow_df_spend_and_revenue_per_month.explain()

### 変換されたデータを Snowflake テーブルに保存

変換したデータを Snowflake テーブル *SPEND_AND_REVENUE_PER_MONTH* に保存してみましょう。

In [None]:
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')

### オプション - 自動化: Snowflake タスクとして月次収益データ変換を実行する

*Note: オプションとして、コードを Snowpark ストアドプロシージャとしてSnowflake にデプロイし、Snowflake タスクとして実行することで、これらの変換をすべて自動タスクとして実行することができます。このタスクでは、AFTER campaign_spend_data_pipeline_task 節があり、依存タスクになっていることに注意してください。*

TIP: [Stored Procedures](https://docs.snowflake.com/en/sql-reference/stored-procedures-python) および [Snowflake Tasks](https://docs.snowflake.com/en/sql-reference/sql/create-task) について

In [None]:
def monthly_revenue_data_pipeline(session: Session) -> str:
  # Load revenue table and transform the data into revenue per year/month using group_by and agg() functions
  snow_df_spend_per_month_t = session.table('spend_per_month')
  snow_df_revenue_t = session.table('monthly_revenue')
  snow_df_revenue_per_month_t = snow_df_revenue_t.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')

  # Join revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training
  snow_df_spend_and_revenue_per_month_t = snow_df_spend_per_month_t.join(snow_df_revenue_per_month_t, ["YEAR","MONTH"])

  # SAVE in a new table for the next task
  snow_df_spend_and_revenue_per_month_t.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')

# Register data pipelining function as a Stored Procedure so it can be run as a task
session.sproc.register(
  func=monthly_revenue_data_pipeline,
  name="monthly_revenue_data_pipeline",
  packages=['snowflake-snowpark-python'],
  is_permanent=True,
  stage_location="@sprocs",
  replace=True)

# WH は必要に応じて変える
monthly_revenue_data_pipeline_task = """
  CREATE OR REPLACE TASK monthly_revenue_data_pipeline_task
      WAREHOUSE = 'SHARED_HOL_WH'
      AFTER campaign_spend_data_pipeline_task
  AS
      CALL monthly_revenue_data_pipeline()
  """
session.sql(monthly_revenue_data_pipeline_task).collect()

### タスクの再開

*Note:* Snowflake タスクはデフォルトで中断されているため、以下のコマンドを実行して再開する必要があります。

In [None]:
# session.sql("alter task monthly_revenue_data_pipeline_task resume").collect()
# session.sql("alter task campaign_spend_data_pipeline_task resume").collect()

### タスクの一時停止

*Note: 上記のタスクを再開する場合は、以下のコマンドを実行して不要なリソースの使用を避けるために一時停止してください。*

In [None]:
# session.sql("alter task campaign_spend_data_pipeline_task suspend").collect()
# session.sql("alter task monthly_revenue_data_pipeline_task suspend").collect()