# Apache Sparkもくもく会ハンズオンノートブック

## 🎯 今日の目標
このもくもく会では、Apache Sparkの基礎を実際に手を動かしながら学んでいきます。大規模データ処理の世界への第一歩を踏み出しましょう！

### 📋 事前準備
- [Databricks Free Edition](https://qiita.com/taka_yayoi/items/33e9cfa7ca9ca9febe72)にログイン済み
- [サーバレスコンピュート](https://docs.databricks.com/aws/ja/compute/serverless/)が起動済み
- このノートブックをインポート済み

### 🚀 学習の流れ
1. Sparkの基本概念を理解
2. データフレームの操作を習得
3. 実データでの分析を実践
4. SQLとの連携を学習
5. Unity Catalogとの統合を理解

## 📚 第1部: Apache Sparkとは

### Sparkの誕生ストーリー
2009年、カリフォルニア大学バークレー校で誕生したApache Sparkは、ビッグデータ処理に革命をもたらしました。Databricksの創業者たちが開発したこのフレームワークは、従来のHadoop MapReduceと比較して、**中間結果をメモリーに保持することで100倍以上の高速化**を実現しました。

### 🐼 PandasとSparkの違い

多くの方が使い慣れているPandasと比較してSparkを理解しましょう：

| 特徴 | Pandas | Apache Spark |
|------|--------|-------------|
| **処理方式** | 単一マシン（シングルスレッド） | 分散処理（複数マシン・並列処理） |
| **データサイズ** | メモリに収まる範囲（通常数GB） | メモリを超える大規模データ（TB〜PB） |
| **処理速度** | 小規模データでは高速 | 大規模データで圧倒的に高速 |
| **実行タイミング** | 即座に実行（Eager Evaluation） | 遅延評価（Lazy Evaluation） |
| **データ構造** | DataFrame | DataFrame（分散） |
| **スケーラビリティ** | 垂直スケール（マシンスペック依存） | 水平スケール（ノード追加で対応） |
| **エラー処理** | エラー時は最初から | 障害耐性あり（自動リトライ） |

![](img/spark_pandas.png)

### なぜSparkを学ぶのか？
- **🚀 スピード**: メモリベースの処理により、大規模データを高速に処理
- **🎯 使いやすさ**: Pandasに似たDataFrame APIで学習曲線が緩やか
- **🔧 モジュール性**: SQL、機械学習、ストリーミング処理など多様な用途に対応
- **📈 拡張性**: データ量の増加に柔軟に対応可能

### 🔍 これから実行するコード
最初に、Databricksで自動的に作成されているSparkセッションを確認します。
Sparkセッションは、Sparkの全機能への入口となる重要なオブジェクトです。

In [0]:
# Sparkセッションの確認
# Databricksでは起動時に自動的に`spark`変数が作成されています
# これがSparkへのエントリーポイントとなります
spark

### 📊 Sparkのバージョンと設定を確認
現在使用しているSparkの環境情報を確認してみましょう。

In [0]:
# Sparkのバージョンを確認
# サーバレスコンピュートでは最新のSparkバージョンが使用されます
print(f"Sparkバージョン: {spark.version}")

# 現在のカタログとデータベースを確認
print(f"現在のカタログ: {spark.sql('SELECT current_catalog()').first()[0]}")
print(f"現在のデータベース: {spark.sql('SELECT current_database()').first()[0]}")

## 🏗️ 第2部: Sparkの基本アーキテクチャを詳しく理解する

### 📐 Sparkクラスターの構成要素

![](img/spark_architecture.png)

Sparkは**分散処理システム**として設計されており、以下の主要コンポーネントから構成されています：

#### 1. ドライバー（Driver）
- **役割**: アプリケーション全体の司令塔
- **責任**:
  - ユーザーコードをタスクに変換
  - タスクのスケジューリング
  - エグゼキューターの監視
  - 実行計画の最適化

#### 2. エグゼキューター（Executor）
- **役割**: 実際のデータ処理を実行
- **責任**:
  - タスクの実行
  - データの保持（メモリまたはディスク）
  - 中間結果の保存
  - ドライバーへの結果返却

#### 3. クラスターマネージャー
- **役割**: リソースの管理と割り当て
- **種類**: Databricks（サーバレス）、YARN、Mesos、Kubernetes等

### 🔄 処理の流れ

```
[ユーザーコード] 
    ↓
[ドライバー] → 論理実行計画 → 物理実行計画 → タスク生成
    ↓
[クラスターマネージャー] → リソース割り当て
    ↓
[エグゼキューター群] → 並列実行
    ↓
[結果の集約]
```

### 🎯 重要な概念

1. **パーティション**: データを分割した単位（並列処理の基本単位）
2. **タスク**: パーティションに対する処理の単位
3. **ステージ**: シャッフルで区切られたタスクのグループ
4. **ジョブ**: アクションによって起動される全体の処理

### 🔍 アーキテクチャの動作原理

Sparkでデータ処理を実行する際の内部動作を理解しましょう：

#### 例: 1000個の数値を処理する場合

```python
# データフレームを4つのパーティションで作成
numbers_df = spark.range(0, 1000, 1, numPartitions=4)
```

このコードが実行されると、以下のような処理が行われます：

1. **パーティション分割**
   - 0-249: パーティション1 → エグゼキューター1で処理
   - 250-499: パーティション2 → エグゼキューター2で処理
   - 500-749: パーティション3 → エグゼキューター3で処理
   - 750-999: パーティション4 → エグゼキューター4で処理

2. **並列実行**
   - 4つのタスクが同時に実行される
   - 各エグゼキューターが独立して処理
   - 処理時間は約1/4に短縮

3. **結果の集約**
   - 各エグゼキューターの結果をドライバーが収集
   - 最終的な結果を生成

#### サーバレス環境での最適化

Databricksのサーバレスコンピュートでは：
- パーティション数が自動的に最適化される
- リソースが動的に割り当てられる
- 負荷に応じてエグゼキューターが自動スケール"

### 📝 理解度チェック
- Q1: ドライバーの主な役割は何でしょうか？
- Q2: エグゼキューターは何を実行しますか？
- Q3: パーティションとタスクの関係は？

これらの質問に答えられるようになれば、Sparkアーキテクチャの基本を理解できています！

## 🎬 第3部: 最初のSparkデータフレームを作成

### 📖 トランスフォーメーションとアクションの理解

Sparkの処理は大きく2種類に分類されます：

1. **トランスフォーメーション（Transformation）**
   - データの変換処理を定義（まだ実行されない）
   - 例: `select()`, `filter()`, `groupBy()`
   
2. **アクション（Action）**
   - 実際に処理を実行して結果を返す
   - 例: `count()`, `show()`, `collect()`

![](img/transform_action.png)

### Step 1: シンプルなデータフレームの作成
まず、連番データを持つデータフレームを作成します。

In [0]:
# 大規模データを想定して、1から100万までの数値を生成
# range()メソッドは「トランスフォーメーション」
# この時点ではデータは生成されず、処理の定義のみが作成されます
first_df = spark.range(1000000)

# データフレームの型を確認
print(f"データフレームの型: {type(first_df)}")
print(f"データフレームのクラス: {first_df.__class__.__name__}")

# スキーマ（データ構造）を確認
# この情報はメタデータとして保持されているため、データを読まずに表示できます
print(f"\nスキーマ情報: {first_df}")
print("\n詳細なスキーマ:")
first_df.printSchema()

### Step 2: トランスフォーメーションのチェーン
複数のトランスフォーメーションを連鎖させて、処理のパイプラインを構築します。

In [0]:
# selectExpr()を使って、SQL式で新しいカラムを作成
# まだ実行されません - 処理の定義を追加するだけ
doubled_df = first_df.selectExpr(
    "id as original_id",           # 元のIDカラムをリネーム
    "(id * 2) as doubled_value"     # IDを2倍にした新しいカラム
)

# さらにトランスフォーメーションを追加
# メソッドチェーンで複数の処理を連結できます
final_df = doubled_df.selectExpr(
    "original_id",
    "doubled_value",
    "(doubled_value / 1000) as divided_by_1000",     # 1000で割る
    "(doubled_value % 100) as modulo_100"            # 100で割った余り
)

# この時点でもまだ処理は実行されていません
print("トランスフォーメーションのチェーンが定義されました")
print("実際の処理はアクションが呼ばれるまで実行されません")

### Step 3: アクションで処理を実行
アクションを呼び出すことで、定義した処理が実際に実行されます。

In [0]:
# take()アクションで上位N件を取得
# ここで初めてSparkが処理を実行します！
print("🎯 処理を実行中...")
results = final_df.take(10)  # 上位10件を取得

# 結果を見やすく表示
print("\n📊 処理結果の最初の10件:")
print("-" * 80)
print(f"{'元のID':>10} | {'2倍の値':>10} | {'1000で割った値':>15} | {'100の余り':>10}")
print("-" * 80)

for row in results:
    print(f"{row['original_id']:10d} | "
          f"{row['doubled_value']:10d} | "
          f"{row['divided_by_1000']:15.2f} | "
          f"{row['modulo_100']:10d}")

### 🎨 Databricksの可視化機能を活用
`display()`関数を使うと、結果をインタラクティブに表示できます。

In [0]:
# display()もアクションとして動作します
# 上位100件を表示（大量データの場合は自動的に制限されます）
print("📈 インタラクティブな表示:")
display(final_df.limit(100))  # limit()はトランスフォーメーション、display()がアクション

## 📊 第4部: 実データで学ぶデータ処理

### 💎 ダイヤモンドデータセットの紹介
ここからは、実際のデータセットを使って実践的なデータ処理を学びます。
使用するのは、約54,000個のダイヤモンドの品質と価格に関するデータです。

![](img/pipeline.png)

### データの読み込み
CSV形式のファイルをSparkデータフレームに読み込みます。

In [0]:
# Databricksに事前に用意されているサンプルデータのパス
data_path = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"

# CSVファイルの読み込み
# spark.read を使用してデータを読み込みます
diamonds = (
    spark.read.format("csv")
    .option("header", "true")         # 1行目をヘッダーとして扱う
    .option("inferSchema", "true")    # データ型を自動推論
    .load(data_path)                   # ファイルパスを指定して読み込み
)

# データセットの基本情報を表示
print("📊 データセットの概要:")
print(f"  総レコード数: {diamonds.count():,} 件")
print(f"  カラム数: {len(diamonds.columns)} 個")
print("\n📝 カラム一覧:")
for i, col in enumerate(diamonds.columns, 1):
    print(f"  {i:2d}. {col}")

### データの探索
データの中身を確認して、どのような情報が含まれているか理解します。

In [0]:
# show()メソッドで最初の数行を表示
# truncate=Falseで文字列が切れないように表示
print("💎 ダイヤモンドデータの最初の5行:")
diamonds.show(5, truncate=False)

# 各カラムの説明
print("\n📖 カラムの説明:")
print("  - carat: カラット（重さ）")
print("  - cut: カットの品質（Fair, Good, Very Good, Premium, Ideal）")
print("  - color: 色のグレード（D=最高 から J=最低）")
print("  - clarity: 透明度（I1=最低, SI2, SI1, VS2, VS1, VVS2, VVS1, IF=最高）")
print("  - price: 価格（USドル）")

In [0]:
# データ型（スキーマ）の詳細を確認
print("📋 データ型の詳細:")
diamonds.printSchema()

# 各データ型の説明
print("\n💡 データ型の説明:")
print("  - string: 文字列型（カテゴリカルデータ）")
print("  - double: 浮動小数点型（連続値）")
print("  - integer: 整数型")

### データクレンジング
分析しやすいようにデータを整形します。

In [0]:
from pyspark.sql.functions import col

# カラム名の変更とデータ型の調整
diamonds_clean = (
    diamonds
    .withColumnRenamed("_c0", "index")           # 最初のカラムをindexに変更
    .withColumn("price", col("price").cast("integer"))  # priceを整数型に変換
    .withColumn("carat", col("carat").cast("float"))    # caratを浮動小数点型に変換
)

# クレンジング結果を確認
print("✨ クレンジング後のデータ:")
display(diamonds_clean.limit(5))

In [0]:
# NULL値の確認
# 実データでは欠損値の確認が重要です
from pyspark.sql.functions import count, when, isnan

print("🔍 各カラムのNULL値をチェック:")

# 各カラムのNULL値をカウントする処理の詳細
# 1. diamonds_clean.columns で全カラム名のリストを取得
# 2. 各カラムに対してcount(when(...))を適用
#    - when(col(c).isNull(), c): カラムcがNULLの場合、カラム名を返す
#    - count(): NULLでない値（つまり上記whenで返されたカラム名）をカウント
#    - alias(c): 結果のカラム名を元のカラム名と同じにする
# 3. select([...])で全カラムのNULLカウントを一度に取得

null_counts = diamonds_clean.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in diamonds_clean.columns
])

# 結果を表示
# 各カラムの下に表示される数値がNULL値の個数
# 0が表示されていれば、そのカラムにはNULL値が存在しない
display(null_counts)

# 追加情報: isnan()関数について
# isnan()は数値カラムのNaN（Not a Number）をチェックする関数
# このデータセットでは文字列カラムも含まれているため、
# isNull()を使用してNULL値をチェックしています

print("\n✅ このデータセットには欠損値がないことが確認できました")
print("   （全カラムでNULL値のカウントが0）")

## 📈 第5部: データ分析の実践

### 基本的な集計処理
GroupByとAggregation関数を使って、データを集計します。

In [0]:
# 必要な集計関数をインポート
from pyspark.sql.functions import avg, max, min, stddev, round, count

# カット品質ごとの価格統計を計算
cut_analysis = (
    diamonds_clean
    .groupBy("cut")                              # カット品質でグループ化
    .agg(
        count("*").alias("個数"),                 # レコード数をカウント
        round(avg("price"), 2).alias("平均価格"),  # 平均価格（小数点2桁）
        round(min("price"), 2).alias("最低価格"),  # 最低価格
        round(max("price"), 2).alias("最高価格"),  # 最高価格
        round(stddev("price"), 2).alias("価格の標準偏差")  # 価格のばらつき
    )
    .orderBy("平均価格", ascending=False)          # 平均価格の降順でソート
)

print("💎 カット品質別の価格分析:")
print("（注: 興味深いことに、'Ideal'カットが最も安い傾向があります）")
display(cut_analysis)

### 複雑な分析: カテゴリ分けと複合集計
条件分岐を使ったカテゴリ分けと、複数の軸での集計を行います。

In [0]:
from pyspark.sql.functions import when, col

# カラットサイズでカテゴリ分け
# when()関数で条件分岐を実装
# 番号を付けることで、表示時に適切な順序でソートされるようにする
diamonds_categorized = diamonds_clean.withColumn(
    "carat_category",
    when(col("carat") < 0.5, "1_小（< 0.5ct）")
    .when(col("carat") < 1.0, "2_中（0.5-1.0ct）")
    .when(col("carat") < 1.5, "3_大（1.0-1.5ct）")
    .otherwise("4_特大（>= 1.5ct）")
)

# カテゴリとカットの組み合わせで集計
category_analysis = (
    diamonds_categorized
    .groupBy("carat_category", "cut")            # 2つの軸でグループ化
    .agg(
        count("*").alias("個数"),
        round(avg("price"), 2).alias("平均価格")
    )
    .orderBy("carat_category", "cut")            # カテゴリ、カットの順でソート
)

print("📊 カラットカテゴリー別・カット別の分析:")
print("（番号付きラベルにより、小→中→大→特大の順で表示されます）")
print("（サイズが大きくなるほど価格が上昇することが確認できます）")
display(category_analysis)

Databricks visualization. Run in Databricks to view.

### ウィンドウ関数を使った高度な分析
ウィンドウ関数を使うと、グループ内でのランキングや累積値などを計算できます。

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

# ウィンドウの定義: カット品質ごとに価格でソート
window_spec = Window.partitionBy("cut").orderBy(col("price").desc())

# 各カット品質内での価格ランキングを計算
top_diamonds = (
    diamonds_clean
    .withColumn("price_rank", row_number().over(window_spec))  # 順位を付与
    .filter(col("price_rank") <= 3)                           # トップ3のみ抽出
    .select("cut", "carat", "color", "clarity", "price", "price_rank")
    .orderBy("cut", "price_rank")                              # 表示用にソート
)

print("🏆 各カット品質における最も高価なダイヤモンドTop3:")
display(top_diamonds)

## 🚀 第6部: SQLとの連携

### Spark SQLの活用
SparkデータフレームをSQLで操作する方法を学びます。
SQLに慣れている方にとって、非常に親しみやすいインターフェースです。

![](img/dataframe_sql.png)

In [0]:
# データフレームを一時ビューとして登録
# これにより、SQLクエリでデータフレームを参照できるようになります
diamonds_clean.createOrReplaceTempView("diamonds_view")

# SQLクエリを実行
# spark.sql()メソッドでSQLを実行し、結果をデータフレームとして取得
sql_result = spark.sql("""
    SELECT 
        cut,                                      -- カット品質
        color,                                    -- 色のグレード
        COUNT(*) as count,                        -- レコード数
        ROUND(AVG(price), 2) as avg_price,       -- 平均価格
        ROUND(AVG(carat), 3) as avg_carat        -- 平均カラット
    FROM diamonds_view
    WHERE price > 1000                           -- 1000ドル以上のダイヤモンドのみ
    GROUP BY cut, color                          -- カットと色でグループ化
    HAVING COUNT(*) > 100                        -- 100個以上のグループのみ
    ORDER BY avg_price DESC                      -- 平均価格の降順
    LIMIT 10                                      -- 上位10件
""")

print("🔍 SQL分析結果（価格1000ドル以上、100個以上のグループ）:")
display(sql_result)

### SQLマジックコマンドの使用
Databricksでは`%sql`マジックコマンドを使って、セル全体をSQLクエリとして実行できます。

In [0]:
%sql
-- SQLマジックコマンドで直接SQL実行
-- より複雑な分析: 透明度ごとの統計
SELECT 
    clarity,                                     -- 透明度グレード
    COUNT(DISTINCT cut) as cut_variations,      -- カット種類の数
    COUNT(*) as total_count,                    -- 総数
    MIN(price) as min_price,                    -- 最低価格
    MAX(price) as max_price,                    -- 最高価格
    ROUND(MAX(price) - MIN(price), 2) as price_range  -- 価格レンジ
FROM diamonds_view
GROUP BY clarity
ORDER BY total_count DESC

## 🎯 第7部: パフォーマンス最適化の基礎知識

### 📚 最適化テクニックの理解

Sparkのパフォーマンス最適化には以下のような手法があります：

1. **キャッシング**: 頻繁に使用するデータフレームをメモリに保持
2. **パーティショニング**: データを適切なサイズに分割
3. **ブロードキャスト結合**: 小さなテーブルを各ノードに配布
4. **述語プッシュダウン**: フィルタ条件を早期に適用

### サーバレスコンピュートでの最適化

Databricksのサーバレスコンピュートでは、多くの最適化が自動的に行われます。
以下、実践可能な最適化テクニックを紹介します：

In [0]:
# 最適化のベストプラクティスを実演

# 1. フィルタの早期適用（述語プッシュダウン）
# 良い例: フィルタを早期に適用
optimized_df = (
    diamonds_clean
    .filter(col("price") > 5000)      # まずフィルタ（データ量を削減）
    .groupBy("cut")                   # その後集計
    .agg(avg("carat").alias("avg_carat"))
)

print("✅ 最適化された処理: フィルタを先に適用")
display(optimized_df)

# 2. 必要なカラムのみ選択
# データ転送量を削減
selected_columns = (
    diamonds_clean
    .select("cut", "price", "carat")  # 必要なカラムのみ選択
    .filter(col("price") > 10000)
)

print("\n✅ 必要なカラムのみを選択して処理")
print(f"選択されたカラム数: {len(selected_columns.columns)}")

### 最適化のベストプラクティス

サーバレス環境で効果的な最適化手法：

1. **早期フィルタリング**: WHERE句は可能な限り早い段階で適用
2. **カラムの選択**: 必要なカラムのみをSELECT
3. **適切な結合順序**: 小さいテーブルを先に結合
4. **パーティション述語**: パーティション化されたテーブルでは適切なフィルタを使用

![](img/optimization.png)

## 🔄 第8部: Pandas API on Sparkの活用

### Pandas APIでSparkを使う
Pandasに慣れている方でも、Sparkの分散処理能力を活用できます。

In [0]:
import pyspark.pandas as ps
import numpy as np

# Pandas API on Sparkでデータフレームを作成
# 通常のPandasと同じ構文で記述できます
ps_df = ps.DataFrame({
    'A': np.random.rand(1000),     # ランダムな値（0-1）
    'B': np.random.rand(1000),     # ランダムな値（0-1）
    'C': np.random.randn(1000)     # 正規分布に従うランダム値
})

# Pandasと同じメソッドが使える
print("📊 Pandas API on Sparkの統計情報:")
print(ps_df.describe())  # 基本統計量の表示

print("\n🔝 上位5件（Aカラムでソート）:")
# sort_valuesやheadなど、Pandasでお馴染みのメソッドが使用可能
print(ps_df.sort_values('A', ascending=False).head())

### 既存のSparkデータフレームをPandas APIで操作

In [0]:
# SparkデータフレームをPandas API on Sparkに変換
diamonds_ps = diamonds_clean.to_pandas_on_spark()

# Pandasスタイルでの基本統計
print("💰 価格カラムの統計情報（Pandas API）:")
price_stats = diamonds_ps['price'].describe()
print(price_stats)

# 基本的な集計を個別に実行
print("\n📈 カット別の基本統計:")
cut_types = diamonds_ps['cut'].unique().to_numpy()  # NumPy配列に変換
for cut_type in cut_types[:3]:  # デモのため最初の3つのみ表示
    cut_data = diamonds_ps[diamonds_ps['cut'] == cut_type]['price']
    print(f"\n{cut_type}:")
    print(f"  平均: ${cut_data.mean():.2f}")
    print(f"  標準偏差: ${cut_data.std():.2f}")
    print(f"  件数: {len(cut_data):,}")

## 💾 第9部: Unity Catalogとデータの永続化

### 📚 Unity Catalogとは

Unity Catalogは、Databricksのデータガバナンスソリューションです：
- **統一されたメタストア**: すべてのデータ資産を一元管理
- **細かいアクセス制御**: テーブル、カラムレベルでの権限管理
- **データリネージ**: データの流れを追跡
- **データ発見**: カタログを通じたデータの検索と理解

### SparkとUnity Catalogの関係

```
Unity Catalog階層:
├── カタログ (Catalog)
│   ├── スキーマ/データベース (Schema/Database)
│   │   ├── テーブル (Table)
│   │   ├── ビュー (View)
│   │   └── 関数 (Function)
```

![](img/uc.png)

### データの保存先
サーバレスコンピュートでは、ファイルシステムへの直接書き込みが制限されているため、
Unity Catalogのテーブルとしてデータを保存します。

In [0]:
# 現在のカタログとスキーマを確認
current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
current_schema = spark.sql("SELECT current_database()").collect()[0][0]

print("📍 現在の位置:")
print(f"  カタログ: {current_catalog}")
print(f"  スキーマ: {current_schema}")

# workspace.defaultスキーマを使用
# これは個人用の作業領域として提供されています
spark.sql("USE workspace.default")
print("\n✅ workspace.defaultスキーマに切り替えました")

### マネージドテーブルとしての保存
Unity Catalogの[マネージドテーブル](https://docs.databricks.com/aws/ja/tables/)としてデータを保存します。

In [0]:
import uuid
from datetime import datetime

# ユニークなテーブル名を生成（衝突を避けるため）
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
unique_suffix = str(uuid.uuid4())[:8]
table_name = f"diamonds_analysis_{timestamp}_{unique_suffix}"

# テーブルとして保存
# Delta形式で保存されます（Databricksのデフォルト）
(
    diamonds_clean.write
    .mode("overwrite")          # 既存テーブルがあれば上書き
    .saveAsTable(f"workspace.default.{table_name}")
)

print(f"✅ テーブルが保存されました: workspace.default.{table_name}")
print("\n📊 保存されたテーブルの情報:")
spark.sql(f"DESCRIBE EXTENDED workspace.default.{table_name}").show(truncate=False)

以下のセルを実行して表示されるリンクをクリックしてテーブルを確認してみましょう。Databricksでは[**カタログエクスプローラ**](https://docs.databricks.com/aws/ja/catalog-explorer/)というUIでテーブルやデータベース、ファイル、モデルなどにアクセスすることができます。

In [0]:
displayHTML(f"<a href='/explore/data/workspace/default/{table_name}'>保存したテーブル</a>")

### 保存したテーブルの読み込み

In [0]:
# テーブルからデータを読み込み
loaded_df = spark.table(f"workspace.default.{table_name}")

# 読み込んだデータを確認
print(f"📥 テーブルから読み込んだデータ:")
print(f"  レコード数: {loaded_df.count():,}")
print(f"  カラム数: {len(loaded_df.columns)}")

# SQLでもアクセス可能
sql_query = f"""
SELECT cut, COUNT(*) as count, AVG(price) as avg_price
FROM workspace.default.{table_name}
GROUP BY cut
ORDER BY avg_price DESC
"""

print("\n🔍 SQLでテーブルをクエリ:")
display(spark.sql(sql_query))

### Delta Lake形式の利点
Databricksでは、テーブルはDelta Lake形式で保存されます。

In [0]:
# Delta Lakeの特徴を確認
print("🏞️ Delta Lake形式の利点:")
print("  1. ACID トランザクション: データの一貫性を保証")
print("  2. タイムトラベル: 過去のバージョンにアクセス可能")
print("  3. スキーマエボリューション: スキーマの変更に対応")
print("  4. データ圧縮: 効率的なストレージ使用")
print("  5. Z-Ordering: クエリパフォーマンスの最適化")

# テーブルの履歴を確認
print(f"\n📜 テーブルの変更履歴:")
history_df = spark.sql(f"DESCRIBE HISTORY workspace.default.{table_name} LIMIT 5")
display(history_df.select("version", "timestamp", "operation", "operationMetrics"))

## 🧹 第10部: クリーンアップとベストプラクティス

### テーブルのクリーンアップ

In [0]:
# 作成したテーブルを削除（もくもく会終了時）
# コメントを外して実行してください
# spark.sql(f"DROP TABLE IF EXISTS workspace.default.{table_name}")
# print(f"🧹 テーブル {table_name} を削除しました")

print("⚠️ クリーンアップを実行する場合は、上記のコメントを外してください")

### 📚 学んだことのまとめ

このもくもく会で学んだ重要なポイント：

#### 1. **Sparkの基本概念**
   - 分散処理アーキテクチャ（ドライバーとエグゼキューター）
   - 遅延評価（トランスフォーメーションとアクション）
   - データフレームAPI
   - Pandasとの違いと使い分け

#### 2. **データ処理テクニック**
   - データの読み込みとクレンジング
   - 集計とグループ化
   - ウィンドウ関数の活用

#### 3. **SQL統合**
   - SparkデータフレームのSQL操作
   - 一時ビューの活用
   - SQLマジックコマンド

#### 4. **Unity Catalogとの統合**
   - カタログ・スキーマ・テーブルの階層構造
   - マネージドテーブルとしてのデータ保存
   - Delta Lake形式の利点

#### 5. **ベストプラクティス**
   - フィルタの早期適用
   - 必要なカラムのみ選択
   - 適切なデータ形式の選択

## 🎓 次のステップ

### さらに学習を深めるために

1. [**構造化ストリーミング**](https://docs.databricks.com/aws/ja/structured-streaming/concepts): リアルタイムデータ処理
2. [**MLlib**](https://docs.databricks.com/aws/ja/machine-learning/train-model/mllib): Sparkの機械学習ライブラリ
3. [**Delta Lake**](https://docs.databricks.com/aws/ja/delta/): より高度なデータレイク機能
4. [**Photon**](https://docs.databricks.com/aws/ja/compute/photon): Databricksの高速実行エンジン

### 参考リソース
- [Apache Spark公式ドキュメント](https://spark.apache.org/docs/latest/)
- [Apache Spark徹底入門](https://www.amazon.co.jp/dp/B0CVQ84T6J/)
- [Apache Sparkとは何か \- Qiita](https://qiita.com/taka_yayoi/items/31190da754106b2d284e)
- [PySparkことはじめ  \- Qiita](https://qiita.com/taka_yayoi/items/a7ee6287031374efa88a)
- [Databricks Learning](https://www.databricks.com/jp/learn)
- [PySpark APIリファレンス](https://spark.apache.org/docs/latest/api/python/)
- [Unity Catalogドキュメント](https://docs.databricks.com/ja/data-governance/unity-catalog/index.html)

## 🚀 チャレンジ問題

もくもく会の残り時間で以下の課題に挑戦してみましょう！

### 課題1: フィルタリングと集計
カラットが1.0以上のダイヤモンドについて、カラー別・クラリティ別の平均価格を計算してください。

In [0]:
# ヒント: 
# 1. filter()でカラット >= 1.0のデータを抽出
# 2. groupBy()で「color」と「clarity」でグループ化
# 3. agg()で平均価格を計算
# 4. orderBy()で結果をソート

# あなたのコードをここに書いてください


#### 📝 課題1の解答例（クリックして展開）
<details>
<summary>解答を表示</summary>
</details>

In [0]:
# 解答例1: DataFrame APIを使用
from pyspark.sql.functions import avg, round, count

large_diamonds_analysis = (
    diamonds_clean
    .filter(col("carat") >= 1.0)                # カラット1.0以上をフィルタ
    .groupBy("color", "clarity")                # カラーと透明度でグループ化
    .agg(
        count("*").alias("個数"),
        round(avg("price"), 2).alias("平均価格")  # 平均価格を計算
    )
    .orderBy("平均価格", ascending=False)        # 平均価格の降順でソート
)

print("💎 1カラット以上のダイヤモンド分析結果:")
display(large_diamonds_analysis.limit(20))

# 解答例2: SQLを使用
sql_solution1 = spark.sql("""
    SELECT 
        color,
        clarity,
        COUNT(*) as count,
        ROUND(AVG(price), 2) as avg_price
    FROM diamonds_view
    WHERE carat >= 1.0
    GROUP BY color, clarity
    ORDER BY avg_price DESC
    LIMIT 20
""")

print("\n🔍 SQL版の結果:")
display(sql_solution1)

### 課題2: ウィンドウ関数で中央値を見つける
各カラーグループ内で、価格が中央値に最も近いダイヤモンドを見つけてください。

In [0]:
# ヒント:
# 1. 各カラーグループの中央値を計算
# 2. ウィンドウ関数でランク付け
# 3. 中央値との差が最小のレコードを抽出

# あなたのコードをここに書いてください


#### 📝 課題2の解答例（クリックして展開）
<details>
<summary>解答を表示</summary>
</details>

In [0]:
# 解答例: ウィンドウ関数を使った中央値の近似
from pyspark.sql.functions import percentile_approx, abs, row_number
from pyspark.sql.window import Window

# Step 1: 各カラーグループの中央値を計算
color_median = (
    diamonds_clean
    .groupBy("color")
    .agg(percentile_approx("price", 0.5).alias("median_price"))
)

# Step 2: 元のデータと中央値をJOIN
diamonds_with_median = (
    diamonds_clean
    .join(color_median, on="color")
    .withColumn("diff_from_median", abs(col("price") - col("median_price")))  # 中央値との差
)

# Step 3: ウィンドウ関数で各カラー内でランク付け
window_median = Window.partitionBy("color").orderBy("diff_from_median")

closest_to_median = (
    diamonds_with_median
    .withColumn("rank", row_number().over(window_median))
    .filter(col("rank") == 1)  # 最も中央値に近いものを選択
    .select("color", "carat", "cut", "clarity", "price", "median_price", "diff_from_median")
    .orderBy("color")
)

print("🎯 各カラーグループで価格が中央値に最も近いダイヤモンド:")
display(closest_to_median)

# 別解: SQL版
sql_solution2 = spark.sql("""
    WITH color_median AS (
        SELECT 
            color,
            PERCENTILE_APPROX(price, 0.5) as median_price
        FROM diamonds_view
        GROUP BY color
    ),
    ranked_diamonds AS (
        SELECT 
            d.*,
            m.median_price,
            ABS(d.price - m.median_price) as diff_from_median,
            ROW_NUMBER() OVER (PARTITION BY d.color ORDER BY ABS(d.price - m.median_price)) as rn
        FROM diamonds_view d
        JOIN color_median m ON d.color = m.color
    )
    SELECT 
        color, carat, cut, clarity, price, median_price, diff_from_median
    FROM ranked_diamonds
    WHERE rn = 1
    ORDER BY color
""")

print("\n🔍 SQL版の結果:")
display(sql_solution2)

### 課題3: 複雑なJOINと分析
カット別とカラー別の統計を別々に計算し、JOINして最も高価な組み合わせを見つけてください。

In [0]:
# ヒント:
# 1. カット別の平均価格データフレームを作成
# 2. カラー別の平均価格データフレームを作成  
# 3. 両方のデータフレームをクロスJOIN
# 4. 組み合わせごとの予想価格を計算

# あなたのコードをここに書いてください


#### 📝 課題3の解答例（クリックして展開）
<details>
<summary>解答を表示</summary>
</details>

In [0]:
# 解答例: 複雑なJOINと分析
from pyspark.sql.functions import avg, round, count, max as spark_max

# Step 1: カット別の統計
cut_stats = (
    diamonds_clean
    .groupBy("cut")
    .agg(
        count("*").alias("cut_count"),
        round(avg("price"), 2).alias("cut_avg_price"),
        round(avg("carat"), 3).alias("cut_avg_carat")
    )
)

# Step 2: カラー別の統計
color_stats = (
    diamonds_clean
    .groupBy("color")
    .agg(
        count("*").alias("color_count"),
        round(avg("price"), 2).alias("color_avg_price"),
        round(avg("carat"), 3).alias("color_avg_carat")
    )
)

# Step 3: 実際の組み合わせの統計
actual_combination = (
    diamonds_clean
    .groupBy("cut", "color")
    .agg(
        count("*").alias("actual_count"),
        round(avg("price"), 2).alias("actual_avg_price"),
        spark_max("price").alias("max_price")
    )
)

# Step 4: すべてをJOINして分析
combined_analysis = (
    actual_combination
    .join(cut_stats, on="cut")
    .join(color_stats, on="color")
    .withColumn(
        "predicted_price",
        round((col("cut_avg_price") + col("color_avg_price")) / 2, 2)
    )
    .withColumn(
        "price_difference",
        round(col("actual_avg_price") - col("predicted_price"), 2)
    )
    .select(
        "cut", "color",
        "actual_count",
        "actual_avg_price",
        "predicted_price",
        "price_difference",
        "max_price"
    )
    .orderBy(col("actual_avg_price").desc())
)

print("💰 最も高価な組み合わせ Top 10:")
display(combined_analysis.limit(10))

# 追加分析: 予測と実際の差が大きい組み合わせ
surprising_combinations = (
    combined_analysis
    .orderBy(abs(col("price_difference")).desc())
    .limit(10)
)

print("\n😮 予測と実際の価格差が大きい組み合わせ:")
display(surprising_combinations)

# SQL版の解答
sql_solution3 = spark.sql("""
    WITH cut_stats AS (
        SELECT 
            cut,
            COUNT(*) as cut_count,
            ROUND(AVG(price), 2) as cut_avg_price
        FROM diamonds_view
        GROUP BY cut
    ),
    color_stats AS (
        SELECT 
            color,
            COUNT(*) as color_count,
            ROUND(AVG(price), 2) as color_avg_price
        FROM diamonds_view
        GROUP BY color
    ),
    actual_combo AS (
        SELECT 
            cut,
            color,
            COUNT(*) as actual_count,
            ROUND(AVG(price), 2) as actual_avg_price,
            MAX(price) as max_price
        FROM diamonds_view
        GROUP BY cut, color
    )
    SELECT 
        a.cut,
        a.color,
        a.actual_count,
        a.actual_avg_price,
        ROUND((cs.cut_avg_price + co.color_avg_price) / 2, 2) as predicted_price,
        a.max_price,
        ROUND(a.actual_avg_price - (cs.cut_avg_price + co.color_avg_price) / 2, 2) as price_diff
    FROM actual_combo a
    JOIN cut_stats cs ON a.cut = cs.cut
    JOIN color_stats co ON a.color = co.color
    ORDER BY a.actual_avg_price DESC
    LIMIT 10
""")

print("\n🔍 SQL版の最も高価な組み合わせ:")
display(sql_solution3)

## 📝 振り返りメモ

今日学んだことを記録しておきましょう：

In [0]:
# 今日の学び（自由に編集してください）
my_learnings = """
1. Sparkの基本概念：
   - 分散処理の仕組みが理解できた
   - 遅延評価の重要性がわかった
   
2. 特に面白かった機能：
   - ウィンドウ関数での高度な分析
   - SQLとDataFrame APIの連携
   
3. 実務で使えそうなテクニック：
   - 大規模データの集計処理
   - Unity Catalogでのデータ管理
   
4. もっと深く学びたいトピック：
   - 構造化ストリーミング
   - 機械学習との連携
"""

print(my_learnings)

# メモを保存したい場合は、以下のコメントを外してテーブルとして保存
# spark.sql(f"CREATE OR REPLACE TABLE workspace.default.my_learning_memo AS SELECT '{my_learnings}' as memo")

## 🎉 お疲れさまでした！

Apache Sparkの世界への第一歩を踏み出しました。今日学んだ基礎を活かして、より大規模で複雑なデータ処理に挑戦していってください！

### 💬 質問とディスカッション
- 不明な点があれば、遠慮なくメンターに質問してください
- 他の参加者と学びを共有しましょう
- 実務での活用アイデアを話し合いましょう

### 🔗 コミュニティ
- Databricks Community に参加して継続的に学習
- Apache Sparkユーザーグループでの情報交換
- 定期的なもくもく会への参加

**Happy Sparking! ✨**