## Avalanche (架空のウィンタースポーツ用品会社)

![IMAGE](https://lh3.googleusercontent.com/pw/AP1GczPiLLf_4vKqdLeP8xr1GYa4eMa36fYztaHgEmiV94zrOvEsvIcPNWQnr85TIbzktK-fWbx32HgSryaWaaWJZjV35JU8E3krcwepmeQoW19s7UyloBZ4cOMTe-a0zCEz8hRMV1Kg4TM7cyEj13WdVAO2=w960-h540-s-no-gm?authuser=0)


Avalancheの注文履歴・出荷データを, [Snowflake 上で動作する pandas](https://docs.snowflake.com/en/developer-guide/snowpark/python/pandas-on-snowflake) を使って分析します

In [None]:
# Snowpark Pandas API
import modin.pandas as spd
# Import the Snowpark pandas plugin for modin
import snowflake.snowpark.modin.plugin
import streamlit as st

import snowflake.snowpark.functions as F
from snowflake.snowpark.context import get_active_session

In [None]:
# Snowflake のアクティブなセッション（現在接続中のセッション）を取得する
# これによって、以降の処理で Snowflake に対して SQL 実行やデータ操作ができるようになる
session = get_active_session()

# セッションに「クエリタグ (query tag)」を設定する
# クエリタグとは、Snowflake 上で実行した SQL クエリに「ラベル」を付ける仕組み
# これにより、モニタリングやトラブルシューティングで
# 「どのアプリから来たクエリか」「どのハンズオン教材からの実行か」などを追跡できる
session.query_tag = {
    "origin": "sf_devrel",          # クエリの発行元（ここでは Snowflake Developer Relations の意味）
    "name": "de_100_vhol",          # このハンズオンや演習の名前
    "version": {                    # バージョン情報
        "major": 1,
        "minor": 0
    },
    "attributes": {                 # 追加の属性情報（カスタムラベルのようなもの）
        "is_quickstart": 1,         # Quickstart チュートリアルからの実行であることを示す
        "source": "notebook",       # Jupyter Notebook や Snowflake Notebook からの実行であることを示す
        "vignette": "snowpark_pandas"  # この教材のシナリオ名（Snowpark + pandas のハンズオンであること）
    }
}


# ✨ ポイント
# --------------
# get_active_session() → すでに開いている Snowflake との接続を取ってくる関数。
# query_tag → Snowflake に「このクエリは何のために動いたのか」を残せる便利なメタ情報。運用や監査で役立ちます。

### TODO: ダウンロードした, 出荷データ(shipping-logs.csv)を Notebooks ワークスペースに読み込む
- 画面左側の[➕]ボタンからファイルをアップロード

In [None]:
# Snowpark pandas（spd）を使って CSV ファイルを読み込む
# 'shipping-logs.csv' という名前のCSVファイルを対象にしている
# CSVの中に 'shipping_date' という列があり、それを日付型（datetime型）として扱うよう指定している
shipping_logs_mdf = spd.read_csv(
    'shipping-logs.csv',        # 読み込むCSVファイルの名前
    parse_dates=['Shipping Date']  # この列を「文字列」ではなく「日付」として読み込む
)

# 読み込んだデータ（shipping_logs_mdf）を表示する
# shipping_logs_mdf は pandas.DataFrame と同じように扱えるオブジェクト
shipping_logs_mdf


# ✨ ポイント
# --------------
# ****
#
# parse_dates=['Shipping Date'] → もし指定しないと "2025-08-17" のような日付も文字列（ただのテキスト）として扱われる。
# → ここで指定することで「日付」として認識され、後で「日付ごとの集計」や「期間でフィルタ」などが簡単にできる。
#
# shipping_logs_mdf → 読み込んだデータを DataFrame 形式で保持する変数。
# shipping_logs_mdf → 変数名の末尾 mdf は「modin dataframe（＝Snowpark pandasのDataFrame）」の略

### TODO: ダウンロードした, 注文履歴データ(order-history.csv)を Notebooks ワークスペースに読み込む
- 画面左側の[➕]ボタンからファイルをアップロード

In [None]:
# Snowpark pandas（spd）を使って CSV ファイルを読み込む
# 'order-history.csv' という名前のCSVファイルを対象にしている
# CSVの中に 'Date' という列があり、それを日付型（datetime型）として扱うように指定している
order_history_mdf = spd.read_csv(
    'order-history.csv',   # 読み込むCSVファイルの名前
    parse_dates=['Ordered Date']   # 'Date' 列を文字列ではなく「日付」として扱う
)

# 読み込んだデータ（order_history_mdf）を表示する
# order_history_mdf は pandas.DataFrame と同じように扱えるオブジェクト
order_history_mdf


# ✨ ポイント
# --------------
# ****
#
# parse_dates=['Date'] → 「注文日（Date）」の列を日付型にしておくと、後で「月ごとの集計」「特定の期間の抽出」などが簡単にできる。

In [None]:
# order_history_mdf の列名を分かりやすく変更する
# ****(columns={...}) で「元の列名 : 新しい列名」を指定する

order_history_mdf = order_history_mdf.rename(columns = {
    'Order ID': 'order_id',              # 注文ID → order_id
    'Customer ID': 'customer_id',        # 顧客ID → customer_id
    'Product ID': 'product_id',          # 商品ID → product_id
    'Product Name': 'product_name',      # 商品名 → product_name
    'Quantity Ordered': 'quantity_ordered',  # 注文数 → quantity_ordered
    'Price': 'price',                    # 単価 → price
    'Total Price': 'total_price',        # 合計金額 → total_price
    'Ordered Date': 'date'                       # 日付 → date
})

# 列名が正しく変更されたかを確認する
order_history_mdf.columns

# ✨ ポイント
#--------------
# ****(columns={...}) → 辞書形式（キー: 値）で「古い名前 → 新しい名前」に変換する。
#
# スネークケース（例: order_id）は Python で一般的な書き方で、プログラムで扱いやすい。
#
# この処理をしておくと、後でコードを書くときに order_history_mdf["order_id"] のように呼び出しやすくなる。

### 価格カラムから $ 記号を取り除いて整理する

In [None]:
# 文字列で表現された価格（例: "$19.99"）を数値に変換する関数
def clean_price(price_str):
    # 価格の文字列から "$" 記号を取り除き、前後の余分な空白も削除する
    # 例: " $19.99 " → "19.99"
    cleaned = price_str.replace('$', '').strip()
    
    # 文字列になっている数値を float型（小数点を持つ数値）に変換する
    # 例: "19.99" → 19.99
    return float(cleaned)


# ✨ ポイント
# --------------
# **** → $ を空文字に置き換えて削除する。
#
# .strip() → 文字列の前後にある余計なスペースや改行を削除する。
#
# float() → 文字列を「実数」に変換する。計算で使えるようになる。

In [None]:
# ---- 価格カラムを数値に変換する処理 ----

# 'price' 列に対して clean_price 関数を適用する
# これにより "$19.99" のような文字列が 19.99 という float型の数値になる
order_history_mdf['price'] = order_history_mdf['price'].apply(clean_price)

# 'total_price' 列に対しても同じく clean_price を適用する
# これで合計金額も数値として扱えるようになる
order_history_mdf['total_price'] = order_history_mdf['total_price'].apply(clean_price)


# ---- 変換後のデータ型を確認する処理 ----

# price 列のデータ型を表示（float になっていればOK）
print("\nPrice column data type:", order_history_mdf['price'].dtype)

# total_price 列のデータ型を表示（こちらも float になっていればOK）
print("Total price column data type:", order_history_mdf['total_price'].dtype)



# ✨ ポイント
# --------------
# .apply(clean_price) → 各行の値に対して clean_price 関数を実行する。
#
# データ型の確認 (dtype)
# 変換前 → object（文字列）
# 変換後 → float64（小数点付き数値）
#
# こうしておくことで、後から「平均」「合計」「グラフ化」などの数値演算ができるようになる。


In [None]:
# 実際に $ が消えて数値化されているかを表で確認
order_history_mdf.head()


# ✨ ポイント
# --------------
# .head() → データフレームの最初の5行を表示して内容を確認する
# 読み込みや列名変更、数値変換が正しくできているかチェックするのに便利

### 製品ごとの注文数を計算する：order_history と shipping_logs を結合する

In [None]:
# order_history_mdf の列名を分かりやすく変更する
# ****(columns={...}) で「元の列名 : 新しい列名」を指定する

shipping_logs_mdf = shipping_logs_mdf.rename(columns = {
    'Order ID': 'order_id',              # 注文ID → order_id
    'Shipping Date': 'shipping_date',    # 発送日 → shipping_date
    'Carrier': 'carrier',                # 配送業者 → carrier
    'Tracking Number': 'trucking_number',# 追跡番号 → trucking_number
    'Latitude': 'latitude',              # 緯度 → lattitude
    'Longitude': 'longitude',            # 経度 → longitude
    'Shipping Status': 'status'         # ステータス → status
})

# 列名が正しく変更されたかを確認する
shipping_logs_mdf.columns

In [None]:
# ---- 注文データと出荷データを結合 ----

# order_history_mdf（注文データ）と shipping_logs_mdf（出荷データ）を
# 'order_id' 列をキーにして結合（マージ）する

order_shipping_mdf = spd.merge(
    order_history_mdf,      # 左側のデータフレーム（注文履歴）
    shipping_logs_mdf,      # 右側のデータフレーム（出荷ログ）
    on='order_id',          # 結合キーとなる列
    how='inner'             # 内部結合（両方に存在するデータのみ）
)

# 結合後のデータフレームの先頭5行を表示して確認
order_shipping_mdf.head(5)


# ✨ ポイント
# --------------
# spd.****() → pandas の **** と同じように使えるが、Snowpark pandas 上で動作する
#
# on='order_id' → 「注文ID」を使ってデータを紐付ける
#
# how='****'：
#     両方のテーブルに存在する注文だけを残す
#     片方だけにある注文は削除される
#     結合すると、注文情報と出荷情報が 1行にまとめられる ので分析しやすくなる
#         例) 「注文から出荷までの日数」や「商品ごとの売上と出荷状況」
#
# 外部結合（how='left' や how='right'）を使うと、片方にしかないデータも残せる


In [None]:
# ---- 商品ごとの注文件数を集計 ----

# 'product_name' 列でグループ化して、注文件数を数える
# ****() は各グループの行数（＝注文数）をカウントする
# reset_index(name='order_count') で結果をデータフレーム形式に戻し、列名を 'order_count' に設定
product_counts_mdf = order_shipping_mdf.groupby('product_name').size().reset_index(name='order_count')

# ---- 注文件数の多い順に並べ替え ----

# sort_values() で 'order_count' 列を降順（ascending=False）に並べる
product_counts_mdf = product_counts_mdf.sort_values('order_count', ascending=False)

# ---- 結果を表示 ----
print("\nProduct Order Counts:")
st.dataframe(product_counts_mdf)



# ✨ ポイント
# --------------
# ****('product_name') → 同じ商品ごとにまとめる 
#
# .****() → 各商品の注文数をカウント
#
# .reset_index(name='order_count') → 集計結果をデータフレームとして整形し、列名を order_count に変更
#
# .sort_values(..., ascending=False) → 注文件数の多い順に並べ替える
# --------------
# これにより、どの商品が人気か（注文が多いか）が一目でわかる
# もし上位5商品だけ見たい場合は product_counts_mdf.head(5) と書くと便利


### 注文の配送ステータスごとにピボットする

In [None]:
# ---- 商品ごとの注文ステータス別集計 ----

# **** を使って集計
# index='product_name' → 行に商品名を設定
# columns='status' → 列に注文ステータス（例: shipped, pending, cancelled）を設定
# values='order_id' → 注文IDを数える対象にする
# aggfunc='count' → 各セルに注文数をカウント
# fill_value=0 → データがない場合は 0 を埋める
product_status_pivot_mdf = order_shipping_mdf.pivot_table(
    index='product_name',
    columns='status',
    values='order_id',
    aggfunc='count',
    fill_value=0
)

# ---- 合計注文数の列を追加 ----

# 行ごとの合計を計算して 'Total_Orders' 列として追加
product_status_pivot_mdf['Total_Orders'] = product_status_pivot_mdf.sum(axis=1)

# ---- 合計注文数の多い順に並べ替え ----

product_status_pivot_mdf = product_status_pivot_mdf.sort_values('Total_Orders', ascending=False)

# ---- 結果を表示 ----
print("\nProduct Orders by Status:")
st.dataframe(product_status_pivot_mdf)

# ✨ ポイント
# --------------
# **** → 行・列を指定して集計表（ピボットテーブル）を作る

# aggfunc='count' → 注文数をカウント

# sum(axis=1) → 行方向の合計を計算して「合計注文数」を追加

# この表を作ると、
# 商品ごとのステータス別の注文数が一目でわかる
# 合計注文数で人気商品をすぐ把握できる

## Avalanche社は、各製品に対する顧客レビューについても理解したいと考えています。  

![IMAGE](https://lh3.googleusercontent.com/pw/AP1GczMuP-pHWhjNDtQwRpMYm0FKey9xlDfRMvcSa6HhxnJrhG-oCs6ydlOhpCvR5VcNDjbFNRir_H4XsFaay-lehwzRV1pgKoB9DjJ31SduUCD2F1gwmZgG4SAM6vNseULS3tYZoW7taYzTW-gc5Lt-4gu3=w960-h540-s-no-gm?authuser=0)



この分析を [Snowpark DataFrame API](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes) を使って実行してみましょう。

In [None]:
-- ---- データベースとスキーマの作成（Snowsight UI で実行する場合） ----
-- CREATE OR REPLACE DATABASE avalanche_db;
-- CREATE OR REPLACE SCHEMA avalanche_schema;

-- 既存データベースを使用する
USE DATABASE avalanche_db;

-- 既存スキーマを使用する
USE SCHEMA avalanche_schema;


-- ---- ファイルを格納するステージ（Stage）の作成 ----
-- Stage とは、Snowflake にデータを取り込む前に一時的にファイルを置いておく場所です
CREATE OR REPLACE STAGE avalanche_stage
  URL = 's3://sfquickstarts/misc/avalanche/csv/'  -- S3 バケットの場所を指定
  DIRECTORY = (ENABLE = TRUE AUTO_REFRESH = TRUE); -- ディレクトリ構造を有効化、自動更新ON

-- Stage 内のファイル一覧を確認
ls @avalanche_stage;


### 顧客レビューを Snowflake のテーブルに読み込む

In [None]:
-- ---- テーブルの作成 ----
-- customer_reviews という名前のテーブルを作成
-- 商品名、レビュー日、レビュー本文、感情スコアを保存する
CREATE OR REPLACE TABLE customer_reviews (
    product VARCHAR,          -- 商品名（文字列）
    date DATE,                -- レビュー日（DATE型）
    summary TEXT,             -- レビュー本文（TEXT型）
    sentiment_score FLOAT     -- 感情スコア（数値、小数点）
);


-- ---- CSV ファイルからデータをテーブルにロード ----
COPY INTO customer_reviews
FROM @avalanche_stage/customer_reviews.csv   -- 先ほど作成した Stage 内の CSV を指定
FILE_FORMAT = (
    TYPE = CSV,                               -- CSV形式のファイル
    FIELD_DELIMITER = ',',                     -- カラム区切り文字はカンマ
    SKIP_HEADER = 1,                           -- 1行目はヘッダーなのでスキップ
    FIELD_OPTIONALLY_ENCLOSED_BY = '"',       -- 値が " " で囲まれている場合に対応
    TRIM_SPACE = TRUE,                         -- 前後の空白を削除
    NULL_IF = ('NULL', 'null'),               -- "NULL" または "null" は NULL として扱う
    EMPTY_FIELD_AS_NULL = TRUE                -- 空欄も NULL として扱う
);


In [None]:
# ---- Snowflake テーブルを Snowpark DataFrame として読み込む ----

# 'customer_reviews' テーブルを Snowpark DataFrame として取得
customer_reviews_sdf = session.table('customer_reviews')

# 取得した Snowpark DataFrame の内容を確認
customer_reviews_sdf


# ✨ ポイント
# --------------
# session.table('table_name')
#   Snowflake 上の既存テーブルを Snowpark DataFrame として扱う
#   pandas の DataFrame に似ているが、実際のデータは Snowflake にあり、クエリ実行時に必要な部分だけ取得する
#
# customer_reviews_sdf
#   この変数に Snowpark DataFrame が格納される
#   データの操作（フィルタリング、集計、結合など）を Snowflake 側で効率的に実行可能

# 💡 補足
# --------------
# Snowpark DataFrame は Lazy Evaluation（遅延評価） です
#   customer_reviews_sdf を定義しただけではまだデータは取得されない
#   データを確認したい場合は .show() や .to_pandas() などで明示的に取得する必要があります

In [None]:
product_sentiment_sdf = customer_reviews_sdf.group_by('PRODUCT') \
    .agg(F.round(F.avg('SENTIMENT_SCORE'),2).alias('AVG_SENTIMENT_SCORE')) \
    .sort(F.col('AVG_SENTIMENT_SCORE').desc())

# Display the results
print("\nAverage Sentiment Scores by Product:")
product_sentiment_sdf

## 📊 データの可視化

[Altair](https://altair-viz.github.io/)を使用して、データ分布をヒストグラムとして簡単に可視化できます。

In [None]:
import altair as alt

# 追加したパッケージ
import matplotlib.pyplot as plt

pdf = customer_reviews_sdf.to_pandas()
chart = alt.Chart(pdf, title='評価分布').mark_bar().encode(
    alt.X("SENTIMENT_SCORE", bin=alt.Bin(step=0.5)),
    y='count()'
)

st.altair_chart(chart)

チャートをカスタマイズして、カーネル密度推定（KDE）と中央値をプロットしたいとします。matplotlibを使用して価格分布をプロットできます。`.plot`コマンドは内部的に`scipy`を使用してKDEプロファイルを計算することに注意してください。これは、このチュートリアルの前半でパッケージとして追加したものです。

In [None]:
fig, ax = plt.subplots(figsize = (6,3))
plt.tick_params(left = False, right = False , labelleft = False) 

price = order_history_mdf["price"]
price.plot(kind = "hist", density = True, bins = 15)
price.plot(kind="kde", color='#c44e52')


# パーセンタイルを計算
median = price.median()
ax.axvline(median,0, color='#dd8452', ls='--')
ax.text(median,0.8, f'Median: {median:.2f}  ',
        ha='right', va='center', color='#dd8452', transform=ax.get_xaxis_transform())

# チャートを美しくする
plt.style.use("bmh")
plt.title("Price Distribution")
plt.xlabel("Price (Binned)")
left, right = plt.xlim()   
plt.xlim((0, right))  
# 目盛りと軸線を削除
ax.tick_params(left = False, bottom = False)
for ax, spine in ax.spines.items():
    spine.set_visible(False)

plt.show()

## サブクエリ/セル間の参照

セルに名前をつけて、後続のセルでその出力を参照することができます


Jinjaを利用して別のSQLセルからSQLテーブルを参照することで、CTEを簡素化することができます。

```sql
SELECT * FROM {{cell}}
```

In [None]:
select 
    product, 
    avg(sentiment_score) as avg_score, 
    min(sentiment_score) as min_score, 
    max(sentiment_score) as max_score
from customer_reviews
group by all;

In [None]:
select * from {{subqueries}}
WHERE avg_score > 0.5;

SQL結果にPythonから直接アクセスし、結果をpandas DataFrameに変換できます。🐼

```python
# SQLセルの出力をSnowpark DataFrameとしてアクセス
my_snowpark_df = sql_querying.to_df()
``` 

```python
# SQLセルの出力をpandas DataFrameに変換
my_df = sql_querying.to_pandas()
``` 

In [None]:
my_df = subqueries2.to_df()
my_df

## ステージパッケージの追加
使用したいPythonパッケージがAnacondaで利用できない場合は、パッケージをステージにアップロードし、ステージからインポートすることができます。ここでは、カスタムパッケージをノートブックにインポートする簡単な例を示します。

In [None]:
-- ステージの作成
CREATE OR REPLACE STAGE AVALANCHE_DB.AVALANCHE_SCHEMA.FILE DIRECTORY = (ENABLE = TRUE);

// Step3: 公開されているGitからスクリプトを取得 //
-- Git連携のため、API統合を作成する
CREATE OR REPLACE API INTEGRATION git_api_integration
  API_PROVIDER = git_https_api
  API_ALLOWED_PREFIXES = ('https://github.com/sfc-gh-skawakami/')
  ENABLED = TRUE;

-- GIT統合の作成
CREATE OR REPLACE GIT REPOSITORY GIT_INTEGRATION_FOR_HANDSON
  API_INTEGRATION = git_api_integration
  ORIGIN = 'https://github.com/sfc-gh-skawakami/sfc-jp-notebook_de_101.git';


ALTER GIT REPOSITORY GIT_INTEGRATION_FOR_HANDSON FETCH;

-- チェックする
ls @GIT_INTEGRATION_FOR_HANDSON/branches/main;

-- Githubからファイルを持ってくる
COPY FILES INTO @AVALANCHE_DB.AVALANCHE_SCHEMA.FILE FROM @GIT_INTEGRATION_FOR_HANDSON/branches/main/simple.zip;
ls @AVALANCHE_DB.AVALANCHE_SCHEMA.FILE;

`simple.zip`の内容

simple/__init__.py

```python
import streamlit as st

def greeting():
  return "Hello world!"

def hi():
  st.write(greeting())
```

In [None]:
import simple

simple.hi()