<div style="border-radius: 10px; border: #DEB887 solid; padding: 15px; background-color: #FFFAF0; font-size: 100%; text-align: left">
<h3 align="left"><font color='#3498DB'> Snowflake Python Worksheet Demo</font></h3>

* Snowflake Python Worksheetsで分析を行うためのコードの例を以下のwebサイトから引用します。
* Python Worksheetsで機械学習モデルの構築〜推論までやってみた
( https://datumstudio.jp/blog/0724_snowflake_python-worksheets/) 
```Python
# The Snowpark package is required for Python Worksheets. 
# You can add more packages by selecting them using the Packages control and then importing them.

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col

# モデルの学習からステージへの保存まで
from snowflake.snowpark.session import Session
import pandas as pd
import io
import joblib
# モデル準備
from sklearn.datasets import load_wine
from sklearn.ensemble import RandomForestClassifier
# 評価指標
from sklearn.metrics import accuracy_score 
# 内部ステージに登録するための関数
def save_file(session, model, path):
    input_stream = io.BytesIO()
    joblib.dump(model, input_stream)
    # upload_streamメソッドで学習したモデルを内部ステージに登録
    session._conn._cursor.upload_stream(input_stream, path) 
    return  

<div style="border-radius: 10px; border: #DEB887 solid; padding: 15px; background-color: #EFGG0F; font-size: 95%; text-align: left">
<h3 align="left"><font color='#3498DB'>🦄 Notes:</font></h3>

この関数は、学習済みのモデルをバイナリ形式で保存し、そのモデルをSnowflakeデータウェアハウス内の内部ステージにアップロードするためのものです。以下は関数内の各ステップについての詳細です：

* input_stream = io.BytesIO(): io.BytesIO() を使用して、バイナリデータを一時的に格納するためのバッファ（メモリ内のバイトデータを格納するストリーム）を作成します。このストリームは後で joblib.dump でモデルを保存するために使用されます。

* joblib.dump(model, input_stream): joblib ライブラリの dump 関数を使用して、学習済みモデルを input_stream にバイナリ形式で保存します。これにより、モデルがバイトデータとしてメモリ内に格納されます。

* session._conn._cursor.upload_stream(input_stream, path): session オブジェクト内のデータベース接続情報を使用して、データウェアハウス内の内部ステージにモデルをアップロードします。具体的には、upload_stream メソッドが input_stream のバイナリデータを指定された path にアップロードします。path は、ステージ内のファイルのパスを指定します。

* return: モデルの保存とアップロードが成功した場合、関数は何も返さずに終了します。モデルの保存およびアップロードが失敗した場合、エラーハンドリングを追加することができます。

この関数の主要な目的は、学習済みモデルをSnowflakeデータウェアハウス内のステージに保存することです。データウェアハウス内のステージにモデルを保存することで、データベース内でモデルを使用して予測や分析を実行する際に、モデルへのアクセスが容易になります。モデルをデータウェアハウス内に保存することは、データ処理パイプラインや自動化タスクの一部として役立つことがあります。

<div style="border-radius: 10px; border: #DEB887 solid; padding: 15px; background-color: #FFFAF0; font-size: 100%; text-align: left">
<h3 align="left"><font color='#3498DB'></font></h3>
    
```Python
# 実行関数
def main(session: snowpark.Session):
    # ワインデータセットのロードと整形
    wine = load_wine()
    df_local = pd.DataFrame(wine.data, columns=wine.feature_names)
    df_local['class'] = [wine.target_names[i] for i in wine.target]
    df_local.columns = [x.upper() for x in df_local.columns]
    df_snowpark = session.create_dataframe(df_local)
    
    # 学習、検証、テストデータに分割し、テーブルに保存
    weights = [0.7, 0.2, 0.1]
    dfsp_train, dfsp_val, dfsp_test = df_snowpark.random_split(weights, seed=42)
    
    dfsp_train.write.mode('overwrite').saveAsTable('wine_train')
    dfsp_val.write.mode('overwrite').saveAsTable('wine_validation')
    dfsp_test.write.mode('overwrite').saveAsTable('wine_test')
    # 学習、検証データの設定
    Xtrain = dfsp_train.drop('class').to_pandas() 
    ytrain = dfsp_train.select('class').to_pandas()
    Xvalid = dfsp_val.drop('class').to_pandas()
    yvalid = dfsp_val.select('class').to_pandas()
    
    # 学習の実行
    model = RandomForestClassifier(n_estimators=10, criterion='gini', max_depth=5, random_state=42)
    model.fit(Xtrain, ytrain)
    # 検証データを使用し、モデルの精度検証
    predictions = model.predict(Xvalid)
    accuracy = accuracy_score(yvalid, predictions)
    # accuracyの評価結果をresultで返す
    result = session.create_dataframe([str(accuracy)], schema=['accuracy'])
    # 使用した検証データと予測結果をテーブルに保存
    df_tmp = pd.concat([dfsp_val.to_pandas(), pd.DataFrame(predictions , columns=['PREDICTION']) ], axis=1)
    dfsp_pred = session.create_dataframe(df_tmp)
    dfsp_pred.write.mode('overwrite').saveAsTable('wine_valid_pred')
    # モデルを内部ステージに保存
    model_stage = 'wine_models'
    query = f"""create or replace stage {model_stage}
             directory = (enable = true)
             copy_options = (on_error='skip_file')"""
    session.sql(query).collect()
    # 作成したステージにモデルファイルを保存
    save_file(session, model, f'@{model_stage}/predict_class.joblib')
    
    return result

<div style="border-radius: 10px; border: #DEB887 solid; padding: 15px; background-color: #EFGG0F; font-size: 95%; text-align: left">
<h3 align="left"><font color='#3498DB'>🦄 Notes:</font></h3>


| コードの部分                    | 機能                                                                                   |
|----------------------------------|---------------------------------------------------------------------------------------|
| `model_stage = 'wine_models'`    | ステージの名前を定義します。内部ステージにモデルを保存するためのステージの名前を指定します。 |
| `query`                          | SQLクエリを文字列として定義します。これにより、内部ステージを作成するクエリが生成されます。    |
| `f"""..."""`                    | フォーマット文字列リテラル（f文字列）を使用して、クエリ内に変数や式を埋め込みます。`model_stage` の値を埋め込んでステージ名を指定します。 |
| `create or replace stage {model_stage}...` | ステージを作成または置き換えるSQLクエリです。`model_stage` で指定されたステージ名の内部ステージを作成または置き換えます。 |
| `directory = (enable = true)`     | ステージのディレクトリの設定を指定します。`enable = true` はステージの有効化を意味します。    |
| `copy_options = (on_error='skip_file')` | ファイルのコピー時のオプションを設定します。ここでは、ファイルのコピー時にエラーが発生した場合、'skip_file' オプションを使用してファイルをスキップする設定が指定されています。 |
| `session.sql(query).collect()`   | Snowflakeデータベースに対してSQLクエリを実行します。`query` のSQLクエリを実行し、結果を収集（collect）します。     |


<div style="border-radius: 10px; border: #DEB887 solid; padding: 15px; background-color: #FFFAF0; font-size: 100%; text-align: left">
<h3 align="left"><font color='#3498DB'></font></h3>
    
```Python
# 内部ステージからモデルの読み込み・推論まで
import snowflake.snowpark as snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark.files import SnowflakeFile
import joblib
import pandas as pd
# モデル準備
from sklearn.ensemble import RandomForestClassifier
# 実行関数
def main(session: snowpark.Session): 
    # 内部ステージからモデルをロード
    model_stage = 'wine_models'
    with SnowflakeFile.open(f'@{model_stage}/predict_class.joblib','rb', require_scoped_url = False) as file:
        model = joblib.load(file)
    # テストデータの読み込み
    dfsp_test = session.table('wine_test') 
    inputs = dfsp_test.drop('class').to_pandas()
    
    # モデルによる推論
    predictions = model.predict(inputs)
    # 推論結果の出力
    df_tmp = pd.concat([dfsp_test.to_pandas(), pd.DataFrame(predictions , columns=['PREDICTION']) ], axis=1)
    dfsp_test_pred = session.create_dataframe(df_tmp)
    dfsp_test_pred.write.mode('overwrite').saveAsTable('wine_test_pred')
    
    return dfsp_test_pred

<div style="border-radius: 10px; border: #DEB887 solid; padding: 15px; background-color: #EFGG0F; font-size: 95%; text-align: left">
<h3 align="left"><font color='#3498DB'>🦄 Notes:</font></h3>


提供されたコードは、Snowflakeデータベース内で機械学習モデルを読み込み、テストデータに対する予測を実行し、その結果を新しいデータフレームとして保存するプロセスを実行しています。以下に関連する部分とそれぞれの機能を説明します：

| コード | 説明 |
|-----------|-----------------------------------|
| `import` ステートメント | 必要なライブラリとモジュールをインポートします。Snowflakeの関連モジュールやデータ操作のためのライブラリ、`joblib`、`pandas`、`RandomForestClassifier` などが含まれています。 |
| `def main(session: snowpark.Session):` | `main` 関数の定義を開始します。この関数は、Snowflakeのセッションオブジェクトを引数として受け取ります。 |
| `model_stage = 'wine_models'` | モデルの保存場所を指定する変数 `model_stage` を設定します。この変数にはモデルが保存されたステージの名前が格納されていると仮定されています。 |
| `with SnowflakeFile.open(f'@{model_stage}/predict_class.joblib','rb', require_scoped_url = False) as file:` | Snowflakeのステージからモデルファイルを読み込みます。`@{model_stage}/predict_class.joblib` はモデルファイルのパスを指定しています。 |
| `model = joblib.load(file)` | `joblib` ライブラリを使用してモデルファイルを読み込み、`model` 変数にモデルオブジェクトを格納します。 |
| `dfsp_test = session.table('wine_test')` | Snowflakeデータベース内の 'wine_test' テーブルを `dfsp_test` データフレームとして読み込みます。 |
| `inputs = dfsp_test.drop('class').to_pandas()` | 'wine_test' テーブルから 'class' 列を除いたデータを `inputs` として取得し、Pandasデータフレームに変換します。 |
| `predictions = model.predict(inputs)` | モデルを使用して `inputs` データに対する予測を行い、その結果を `predictions` に格納します。 |
| `df_tmp = pd.concat([dfsp_test.to_pandas(), pd.DataFrame(predictions, columns=['PREDICTION'])], axis=1)` | 元のテストデータフレーム (`dfsp_test`) と予測結果を結合して新しいデータフレーム `df_tmp` を作成します。 |
| `dfsp_test_pred = session.create_dataframe(df_tmp)` | 新しいデータフレーム `df_tmp` を Snowflake データベースに保存する新しいデータフレーム `dfsp_test_pred` を作成します。 |
| `dfsp_test_pred.write.mode('overwrite').saveAsTable('wine_test_pred')` | 新しいデータフレーム `dfsp_test_pred` を 'wine_test_pred' という名前のテーブルとして保存します。 |
| `return dfsp_test_pred` | 最終的なデータフレーム `dfsp_test_pred` を返します。 |

このコードは、Snowflakeデータベース内のモデルをロードし、テストデータに対する予測を行い、その予測結果を新しいテーブルに保存するタスクを実行するための関数です。

<div style="border-radius: 10px; border: #DEB887 solid; padding: 15px; background-color: #FFFAF0; font-size: 100%; text-align: left">
<h3 align="left"><font color='#3498DB'>コードのリメイク1</font></h3>

* 上記のコードをリメイクしてみました。
* 以下はSnowflake Python WorksheetsでSARIMAXモデルを使って分析する例です。
* 変数名、データフレーム名は変数列'DATE'のみ明示的にしており、他は任意です。
  
```Python
# The Snowpark package is required for Python Worksheets. 
# You can add more packages by selecting them using the Packages control and then importing them.

import snowflake.snowpark as snowpark

from snowflake.snowpark.functions import month,year,col,sum
import pandas as pd
import numpy as np
import statsmodels.api as sm
import statsmodels.tsa as tsa

def main(session: snowpark.Session): 
    # Your code goes here, inside the "main" handler.
    tableName = '-------------------'
    dataframe = session.table(tableName).filter(col("---------") == '---')
    # Spark DataFrameからPandas DataFrameに変換
    df = dataframe.toPandas()
    # 列名を指定
    column_names = ['DATE', '-----', '-------------']
    column_names = np.array(column_names)
    
    df = pd.DataFrame(df, columns=column_names)
    df['DATE'] = pd.to_datetime(df['DATE'])
   
    # State Space Modelの構築
    mod = sm.tsa.statespace.SARIMAX(df['---------'], 
                                order=(1, 1, 1),  # 自己回帰次数、差分次数、移動平均次数
                                seasonal_order=(1, 1, 1, 7),  # 季節性の自己回帰次数、差分次数、移動平均次数、周期（週次データの場合は7）
                                enforce_stationarity=False,
                                enforce_invertibility=False)

    # モデルの適合
    results = mod.fit()
    # パラメータの推定値を取得
    params = results.params

    # パラメータをPandas DataFrameに変換
    params_df = params.to_frame(name='Parameter_Estimate')

    # パラメータ名をインデックスから列に移動
    params_df.reset_index(inplace=True)
    params_df = params_df.rename(columns={'index': 'Parameter_Name'})


    result = session.create_dataframe(params_df)
    return result

<div style="border-radius: 10px; border: #DEB887 solid; padding: 15px; background-color: #FFFAF0; font-size: 100%; text-align: left">
<h3 align="left"><font color='#3498DB'>コードのリメイク2</font></h3>
    
```Python
# The Snowpark package is required for Python Worksheets. 
# You can add more packages by selecting them using the Packages control and then importing them.

import snowflake.snowpark as snowpark

from snowflake.snowpark.functions import month,year,col,sum
import pandas as pd
import numpy as np
import statsmodels.api as sm
import statsmodels.tsa as tsa

def main(session: snowpark.Session): 
    # Your code goes here, inside the "main" handler.
    tableName = '-------------------'
    dataframe = session.table(tableName).filter(col("---------") == '---')
    # Spark DataFrameからPandas DataFrameに変換
    df = dataframe.toPandas()
    # 列名を指定
    column_names = ['DATE', '-----', '-------------']
    column_names = np.array(column_names)
    
    df = pd.DataFrame(df, columns=column_names)
    df['DATE'] = pd.to_datetime(df['DATE'])
   
    # State Space Modelの構築
    mod = sm.tsa.statespace.SARIMAX(df['---------'], 
                                order=(1, 1, 1),  # 自己回帰次数、差分次数、移動平均次数
                                seasonal_order=(1, 1, 1, 7),  # 季節性の自己回帰次数、差分次数、移動平均次数、周期（週次データの場合は7）
                                enforce_stationarity=False,
                                enforce_invertibility=False)

    # モデルの適合
    results = mod.fit()
    # パラメータの推定値を取得
    params = results.params

    # パラメータをPandas DataFrameに変換
    params_df = params.to_frame(name='Parameter_Estimate')

    # パラメータ名をインデックスから列に移動
    params_df.reset_index(inplace=True)
    params_df = params_df.rename(columns={'index': 'Parameter_Name'})


    result = session.create_dataframe(params_df)
    return result