# Part 1 デモ: PySparkによるETL処理

このノートブックでは、PySparkを使った命令型のETL処理を体験します。

**学習内容:**
- DataFrameの基本操作
- 変換処理（Transformation）
- アクション（Action）
- メダリオンアーキテクチャに沿ったETL実装

## 1. サンプルデータの確認

Databricksには `samples.nyctaxi.trips` というNYCタクシーのサンプルデータが用意されています。

In [0]:
# サンプルデータを読み込み
df = spark.table("samples.nyctaxi.trips")

# データを表示（最初の10行）
display(df)

In [0]:
# 行数を確認
print(f"総レコード数: {df.count():,} 件")

In [0]:
# スキーマ（カラム名と型）を確認
df.printSchema()

### データの説明

| カラム名 | 型 | 説明 |
|---------|-----|------|
| tpep_pickup_datetime | timestamp | 乗車日時 |
| tpep_dropoff_datetime | timestamp | 降車日時 |
| trip_distance | double | 走行距離（マイル） |
| fare_amount | double | 運賃（ドル） |
| pickup_zip | int | 乗車地点の郵便番号 |
| dropoff_zip | int | 降車地点の郵便番号 |

## 2. 変換処理（Transformation）の基本

変換処理は「新しいDataFrameを返す」操作です。  
**重要**: 変換処理だけでは実行されません（遅延評価）

### 2.1 filter() - 条件に合う行を抽出

In [0]:
from pyspark.sql.functions import col

# 運賃が0より大きいデータのみ抽出
df_filtered = df.filter(col("fare_amount") > 0)

# この時点ではまだ実行されていない！
print("フィルタ条件を設定しました（まだ実行されていません）")

In [0]:
# show() を呼ぶと実行される（これがアクション）
df_filtered.show(5)

### 2.2 select() - 特定のカラムを選択

In [0]:
# 必要なカラムのみ選択
df_selected = df.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime", 
    "trip_distance",
    "fare_amount"
)

display(df_selected)

### 2.3 withColumn() - 新しいカラムを追加

In [0]:
from pyspark.sql.functions import col, round, unix_timestamp

# 乗車時間（分）のカラムを追加
df_with_duration = df.withColumn(
    "trip_duration_min",
    round((unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60, 2)
)

display(df_with_duration)

### 2.4 メソッドチェーン - 複数の変換を連結

In [0]:
# 複数の変換を一度に記述（これでも実行はされない）
df_transformed = (
    df
    .filter(col("fare_amount") > 0)
    .filter(col("trip_distance") > 0)
    .select("tpep_pickup_datetime", "trip_distance", "fare_amount")
    .withColumn("fare_per_mile", round(col("fare_amount") / col("trip_distance"), 2))
)

# アクションを呼ぶまで実行されない
print("変換処理を定義しました")

In [0]:
# show() で実行
df_transformed.show(10)

## 3. 集計処理

### 3.1 groupBy() と集計関数

In [0]:
from pyspark.sql.functions import count, avg, sum, max, min, to_date

# 日付ごとの集計
df_daily_stats = (
    df
    .filter(col("fare_amount") > 0)
    .withColumn("pickup_date", to_date("tpep_pickup_datetime"))
    .groupBy("pickup_date")
    .agg(
        count("*").alias("trip_count"),
        round(avg("fare_amount"), 2).alias("avg_fare"),
        round(avg("trip_distance"), 2).alias("avg_distance"),
        round(sum("fare_amount"), 2).alias("total_revenue")
    )
    .orderBy("pickup_date")
)

display(df_daily_stats)

## 4. メダリオンアーキテクチャでETLを実装

Bronze → Silver → Gold の3層でデータを加工していきます。

### 4.1 スキーマの準備

In [0]:
# スキーマを作成（workspaceカタログを使用）
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.part1_demo")
spark.sql("USE workspace.part1_demo")

print("スキーマ workspace.part1_demo を使用します")

### 4.2 Bronze層 - 生データをそのまま保存

In [0]:
# Bronzeテーブル: ソースデータをそのまま保存
bronze_df = spark.table("samples.nyctaxi.trips")

# テーブルとして保存
bronze_df.write.mode("overwrite").saveAsTable("bronze_trips")

print("✅ bronze_trips テーブルを作成しました")

In [0]:
# 確認
spark.table("bronze_trips").show(5)

### 4.3 Silver層 - クレンジング処理

In [0]:
from pyspark.sql.functions import unix_timestamp

# Silverテーブル: クレンジング済みデータ
silver_df = (
    spark.table("bronze_trips")
    # 不正データを除外
    .filter(col("fare_amount") > 0)
    .filter(col("trip_distance") > 0)
    # 乗車時間（分）を計算して追加
    .withColumn(
        "trip_duration_min",
        round((unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60, 2)
    )
    # マイルあたりの運賃を計算
    .withColumn(
        "fare_per_mile",
        round(col("fare_amount") / col("trip_distance"), 2)
    )
)

# テーブルとして保存
silver_df.write.mode("overwrite").saveAsTable("silver_trips")

print("✅ silver_trips テーブルを作成しました")

In [0]:
# 確認: Bronzeとの件数比較
bronze_count = spark.table("bronze_trips").count()
silver_count = spark.table("silver_trips").count()
removed = bronze_count - silver_count

print(f"Bronze: {bronze_count:,} 件")
print(f"Silver: {silver_count:,} 件")
print(f"除外されたレコード: {removed:,} 件 ({removed/bronze_count*100:.1f}%)")

### 4.4 Gold層 - ビジネス用に集計

In [0]:
# Goldテーブル: 日別サマリー
gold_df = (
    spark.table("silver_trips")
    .withColumn("date", to_date("tpep_pickup_datetime"))
    .groupBy("date")
    .agg(
        count("*").alias("trip_count"),
        round(sum("fare_amount"), 2).alias("total_revenue"),
        round(avg("fare_amount"), 2).alias("avg_fare"),
        round(avg("trip_distance"), 2).alias("avg_distance"),
        round(avg("trip_duration_min"), 2).alias("avg_duration_min")
    )
    .orderBy("date")
)

# テーブルとして保存
gold_df.write.mode("overwrite").saveAsTable("gold_daily_stats")

print("✅ gold_daily_stats テーブルを作成しました")

In [0]:
# 結果を確認
display(spark.table("gold_daily_stats"))

## 5. 作成したテーブルの確認

In [0]:
# 作成したテーブル一覧
spark.sql("SHOW TABLES").show()

## まとめ

### 学んだこと
- **DataFrame**: Sparkの基本データ構造
- **変換処理**: filter(), select(), withColumn(), groupBy()
- **アクション**: show(), count(), display(), write()
- **遅延評価**: アクションを呼ぶまで実行されない
- **メダリオンアーキテクチャ**: Bronze → Silver → Gold

### 次のステップ
Part 2では、この処理を「宣言型」で書き直し、より簡潔に実装する方法を学びます。