### 必要モジュールのインストール

In [1]:
!pip install delta-spark



### Sparkセッションの生成

In [2]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# SparkSessionの初期化
builder = (
    SparkSession.builder.appName("DeltaLakeExample")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
# Delta Lakeで動作するように設定
spark = configure_spark_with_delta_pip(builder).getOrCreate()

### データの入出力

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# ===== データの作成
# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)

# DataFrameの内容を表示
df.show()

+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
+------+---+



In [4]:
# データの書き込み
(df.write
    .format("delta")
    .mode("overwrite")
    .save("./delta-table")
)

In [5]:
# データの読み込み
df_read = (spark.read
                .format("delta")
                .load("./delta-table")
)
df_read.show()

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+



### タイムトラベルを使用する

In [6]:
# データを更新する
new_row = spark.createDataFrame([("Jiro", 40)], schema)

# データフレームに追加
df = df.union(new_row)
df.show()

# データの更新
(df.write
    .format("delta")
    .mode("overwrite")
    .save("./delta-table")
)

+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
|  Jiro| 40|
+------+---+



In [7]:
# ===== タイムトラベルを使用する
from delta import DeltaTable

# Delta Lakeの履歴情報を取得する
delta_table = DeltaTable.forPath(spark, "./delta-table")
history = delta_table.history()

# すべての履歴を表示
history.show(truncate=False)
# バージョンのみ選択して表示
history.select("version").show(truncate=False)
# タイムスタンプのみ選択して表示
history.select("timestamp").show(truncate=False)

+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|1      |2023-12-09 10:09:36.239|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |0          |Serializable  |false        |{numFiles -> 5, nu

In [8]:
# 特定のバージョンを使用してアクセスする
df_version0 = spark.read.format("delta").option("versionAsOf", 0).load("./delta-table")
df_version0.show()

df_version1 = spark.read.format("delta").option("versionAsOf", 1).load("./delta-table")
df_version1.show()

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
|  Jiro| 40|
+------+---+



In [10]:
# タイムスタンプを使用してアクセスする
timestamp = "2023-12-09 10:09:30.767"

df_timestamp = spark.read.format("delta").option("timestampAsOf", timestamp).load("./delta-table")
df_timestamp.show()

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+



In [11]:
# データを特定のバージョンにロールバックする
# バージョンを指定する
rollback_version = 0

# 該当データを読み込む
df = spark.read.format("delta").option("versionAsOf", rollback_version).load("./delta-table")

# テーブルを上書きする
df.write.format("delta").mode("overwrite").save("./delta-table")

# Delta Lakeの履歴情報を取得する
delta_table = DeltaTable.forPath(spark, "./delta-table")
history = delta_table.history()
history.show(truncate=False)

+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|2      |2023-12-09 10:09:51.892|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |1          |Serializable  |false        |{numFiles -> 3, nu

### スキーマの進化

In [12]:
from pyspark.sql.functions import lit

# Delta Lakeに列を追加する
df = df.withColumn("flg", lit(1))
df.show()

# テーブルを上書きする ←これはエラーになる
df.write.format("delta").mode("overwrite").save("./delta-table")

+------+---+---+
|  name|age|flg|
+------+---+---+
|Hanako| 30|  1|
|  Taro| 25|  1|
|  Yuki| 20|  1|
+------+---+---+



AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 38964833-dc9b-49a6-bfe5-b0c9142989fa).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- name: string (nullable = true)
-- age: integer (nullable = true)


Data schema:
root
-- name: string (nullable = true)
-- age: integer (nullable = true)
-- flg: integer (nullable = true)

         
To overwrite your schema or change partitioning, please set:
'.option("overwriteSchema", "true")'.

Note that the schema can't be overwritten when using
'replaceWhere'.
         

In [14]:
# 特定の書き込みにのみスキーマの進化を適用したい場合
(df.write
    .format("delta")
    .option("mergeSchema", "true")
    .mode("overwrite")
    .save("./delta-table")
)

In [15]:
# スキーマの進化を有効化
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Delta Lakeに列を追加する
df = df.withColumn("country", lit("jpn"))
df.show()

# テーブルを上書きする
df.write.format("delta").mode("overwrite").save("./delta-table")

+------+---+---+-------+
|  name|age|flg|country|
+------+---+---+-------+
|Hanako| 30|  1|    jpn|
|  Taro| 25|  1|    jpn|
|  Yuki| 20|  1|    jpn|
+------+---+---+-------+



### メタデータの管理

In [16]:
# テーブル情報の取得
delta_table = DeltaTable.forPath(spark, "./delta-table")

In [17]:
# テーブル情報の表示
delta_table.toDF().show()

+------+---+---+-------+
|  name|age|flg|country|
+------+---+---+-------+
|Hanako| 30|  1|    jpn|
|  Yuki| 20|  1|    jpn|
|  Taro| 25|  1|    jpn|
+------+---+---+-------+



In [18]:
# テーブルの履歴とバージョン
delta_table.history().show(truncate=False)

+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|5      |2023-12-09 10:10:00.117|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |4          |Serializable  |false        |{numFiles -> 3, nu

In [19]:
# テーブルのファイル情報
spark.sql("DESCRIBE DETAIL './delta-table'").show(truncate=False)

+------+------------------------------------+----+-----------+----------------------------------------------------------------------------------------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+
|format|id                                  |name|description|location                                                                                                  |createdAt              |lastModified           |partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|tableFeatures           |
+------+------------------------------------+----+-----------+----------------------------------------------------------------------------------------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+
|delta |38964833-dc9b-

### セッションを終了

In [20]:
# SparkSessionを終了
spark.stop()