# デルタレイク クイックスタート Delta Lake Quickstart
https://docs.delta.io/latest/quick-start.html#python

# 本サンプルの目的 Ovjective
データのUPSERT例<br>
削除フラグのある論理削除を想定している<br>
Example of UPSERT of data<br>
Assumes logical deletion with delete flag

# データ概要 Data Summary
## ターゲットデータ Target data
`/workspace/csv/products_10.csv`<br>
ターゲットデータは、永続化されたデルタテーブルで、Lakehouseとしての用途を想定している<br>
Target data is a persistent delta table, intended for use as a Lakehouse
| id | name | short_name | kind | material | price | description | delete_flg |
| --- | --- | --- | --- | --- | --- | --- | --- |
| 1 | Fantastic Granite Pizza | Shoes | Small | Cotton | 3067 | nothing | 0 |
| 2 | Rustic Soft Fish | Pants | Handmade | Steel | 7107 | nothing | 0 |
| 3 | Licensed Cotton Hat | Sausages | Fantastic | Metal | 9050 | nothing | 0 |
| 4 | Luxurious Bronze Towels | Chair | Intelligent | Plastic | 4967 | nothing | 0 |
| 5 | Generic Wooden Towels | Mouse | Small | Metal | 4428 | still | 0 |
| 6 | Handcrafted Cotton Sausages | Computer | Licensed | Plastic | 5047 | update | 0 |
| 7 | Handmade Wooden Towels | Chips | Small | Steel | 7796 | update | 0 |
| 8 | Licensed Metal Fish | Computer | Refined | Rubber | 7437 | delete_flg | 0 |
| 9 | Electronic Concrete Chair | Cheese | Incredible | Metal | 1062 | delete_flg | 0 |
| 10 | Incredible Metal Computer | Pants | Unbranded | Rubber | 6368 | delete_flg | 0 |

## ソースデータ Source data
`/workspace/csv/products_15.csv`<br>
ソースデータは、ELTの際に、ターゲットデータにデータを挿入する前にデータを挿入する一時テーブルとしての用途を想定している<br>
一時テーブルなので、使い終わったら削除をしたいが、アンマネージドテーブルは削除ができないので要注意<br>
※マネージドテーブルだと削除ができる<br>
Source data is intended to be used as a temporary table to insert data before inserting data into target data during ELT<br>
Since it is a temporary table, you want to delete it when you are done using it, but be aware that unmanaged tables cannot be deleted.<br>
※Managed table allows deletion.
| id | name | short_name | kind | material | price | description | delete_flg |
| --- | --- | --- | --- | --- | --- | --- | --- |
| 5 | Generic Wooden Towels | Mouse | Small | Metal | 4428 | still | 0 |
| 6 | Handcrafted Cotton Sausages | Computer | 1 | 1 | 1 | update | 0 |
| 7 | Handmade Wooden Towels | Chips | 1 | 1 | 1 | update | 0 |
| 8 | Licensed Metal Fish | Computer | Refined | Rubber | 7437 | delete_flg | 1 |
| 9 | Electronic Concrete Chair | Cheese | Incredible | Metal | 1062 | delete_flg | 1 |
| 10 | Incredible Metal Computer | Pants | Unbranded | Rubber | 6368 | delete_flg | 1 |
| 11 | Oriental Rubber Gloves | Hat | Electronic | Metal | 7357 | insert | 0 |
| 12 | Elegant Soft Pizza | Hat | Practical | Cotton | 6231 | insert | 0 |
| 13 | Sleek Plastic Salad | Soap | Refined | Frozen | 9682 | insert | 0 |
| 14 | Refined Fresh Sausages | Tuna | Ergonomic | Granite | 3528 | insert | 0 |
| 15 | Elegant Steel Ball | Bacon | Rustic | Metal | 9283 | insert | 0 |

## UPSERT後のデータ Data after UPSERT
`/workspace/tables/products`<br>
ターゲットデータにソースデータをUPSERTした結果、得られる予定のデータ<br>
Data expected to result from UPSERT of source data to target data
| id | name | short_name | kind | material | price | description | delete_flg |
| --- | --- | --- | --- | --- | --- | --- | --- |
| 1 | Fantastic Granite Pizza | Shoes | Small | Cotton | 3067 | nothing | 0 |
| 2 | Rustic Soft Fish | Pants | Handmade | Steel | 7107 | nothing | 0 |
| 3 | Licensed Cotton Hat | Sausages | Fantastic | Metal | 9050 | nothing | 0 |
| 4 | Luxurious Bronze Towels | Chair | Intelligent | Plastic | 4967 | nothing | 0 |
| 5 | Generic Wooden Towels | Mouse | Small | Metal | 4428 | still | 0 |
| 6 | Handcrafted Cotton Sausages | Computer | 1 | 1 | 1 | update | 0 |
| 7 | Handmade Wooden Towels | Chips | 1 | 1 | 1 | update | 0 |
| 8 | Licensed Metal Fish | Computer | Refined | Rubber | 7437 | delete_flg | 1 |
| 9 | Electronic Concrete Chair | Cheese | Incredible | Metal | 1062 | delete_flg | 1 |
| 10 | Incredible Metal Computer | Pants | Unbranded | Rubber | 6368 | delete_flg | 1 |
| 11 | Oriental Rubber Gloves | Hat | Electronic | Metal | 7357 | insert | 0 |
| 12 | Elegant Soft Pizza | Hat | Practical | Cotton | 6231 | insert | 0 |
| 13 | Sleek Plastic Salad | Soap | Refined | Frozen | 9682 | insert | 0 |
| 14 | Refined Fresh Sausages | Tuna | Ergonomic | Granite | 3528 | insert | 0 |
| 15 | Elegant Steel Ball | Bacon | Rustic | Metal | 9283 | insert | 0 |

# 処理イメージ
```mermaid
flowchart TB
  TD([name : Target data\npath : /workspace/csv/products_10.csv\ntype : csv])
  SD([name : Source data\npath : /workspace/csv/products_15.csv\ntype : csv])

  DFTD[name : csvDf\ntype : dataformat]
  DFSD[name : csvDfTmp\ntype : dataformat]

  DTTD[(name : products\npath : /workspace/tables/products\ntype : Delta table)]
  DTSD[(products_tmp\n/workspace/tables/products_tmp\ntype : Delta table)]

  TD-->|spark.read|DFTD
  SD-->|spark.read|DFSD

  DFTD-->|write\noverwrite|DTTD
  DFSD-->|write\noverwrite|DTSD

  DTSD-->|upsert\nkey = id|DTTD
```
※処理フローは要件によって変わってくるので、あくまでも参考とすること<br>
The processing flow will vary depending on requirements and should be used as a reference only.

In [1]:
import pyspark
from delta import *
from delta.tables import *
import packages.modules as modules
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, StructField, StructType

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

workspace = "/workspace"
file_name = "products_10"
file_name_tmp = "products_15"
file_ext = ".csv"
data_path = workspace + "/csv/" + file_name + file_ext
data_path_tmp = workspace + "/csv/" + file_name_tmp + file_ext
delta_table_name = "products"
delta_table_name_tmp = "products_tmp"
delta_table_path = workspace + "/tables/" + delta_table_name
delta_table_path_tmp = workspace + "/tables/" + delta_table_name_tmp

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vscode/.ivy2/cache
The jars for the packages stored in: /home/vscode/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6b0fab98-d165-439c-be94-02af12ed7fe6;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 217ms :: artifacts dl 7ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.0.0 from central in [default]
	io.delta#delta-storage;3.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0 

# 処理開始
products_10のcsvデータを取得する

In [2]:
csvDf = spark.read.option("delimiter", ",").option("header", "true").csv(data_path)
csvDf.printSchema()
csvDf.show(100)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- kind: string (nullable = true)
 |-- material: string (nullable = true)
 |-- price: string (nullable = true)
 |-- description: string (nullable = true)
 |-- delete_flg: string (nullable = true)

+---+--------------------+----------+-----------+--------+-----+-----------+----------+
| id|                name|short_name|       kind|material|price|description|delete_flg|
+---+--------------------+----------+-----------+--------+-----+-----------+----------+
|  1|Fantastic Granite...|     Shoes|      Small|  Cotton| 3067|    nothing|         0|
|  2|    Rustic Soft Fish|     Pants|   Handmade|   Steel| 7107|    nothing|         0|
|  3| Licensed Cotton Hat|  Sausages|  Fantastic|   Metal| 9050|    nothing|         0|
|  4|Luxurious Bronze ...|     Chair|Intelligent| Plastic| 4967|    nothing|         0|
|  5|Generic Wooden To...|     Mouse|      Small|   Metal| 4428|    

product_15のcsvデータを取得する

In [3]:
csvDfTmp = spark.read.option("delimiter", ",").option("header", "true").csv(data_path_tmp)
csvDfTmp.printSchema()
csvDfTmp.show(100)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- kind: string (nullable = true)
 |-- material: string (nullable = true)
 |-- price: string (nullable = true)
 |-- description: string (nullable = true)
 |-- delete_flg: string (nullable = true)

+---+--------------------+----------+----------+--------+-----+-----------+----------+
| id|                name|short_name|      kind|material|price|description|delete_flg|
+---+--------------------+----------+----------+--------+-----+-----------+----------+
|  5|Generic Wooden To...|     Mouse|     Small|   Metal| 4428|      still|         0|
|  6|Handcrafted Cotto...|  Computer|         1|       1|    1|     update|         0|
|  7|Handmade Wooden T...|     Chips|         1|       1|    1|     update|         0|
|  8| Licensed Metal Fish|  Computer|   Refined|  Rubber| 7437| delete_flg|         1|
|  9|Electronic Concre...|    Cheese|Incredible|   Metal| 1062| delete_flg|

# デルタテーブルにデータを書き込む
ローカル環境上で`saveAsTable`を使おうとすると、エラーが発生する。<br>
原因は、今の所、調査中。<br>
なので、`save`を使って、データを書き込んでいる。

In [4]:
csvDf.write\
  .format("delta")\
  .mode("overwrite")\
  .partitionBy("kind")\
  .save(delta_table_path)

                                                                                

In [5]:
csvDfTmp.write\
  .format("delta")\
  .mode("overwrite")\
  .partitionBy("kind")\
  .save(delta_table_path_tmp)

                                                                                

# デルタテーブルをUpsertする

In [6]:
deltaTable = DeltaTable.forPath(spark, delta_table_path)
deltaTableTmp = DeltaTable.forPath(spark, delta_table_path_tmp)
dfTemp = deltaTableTmp.toDF()

deltaTable.alias("products")\
  .merge(
    source = dfTemp.alias("tableTmp"),
    condition = "products.id = tableTmp.id"
  )\
  .whenMatchedUpdateAll()\
  .whenNotMatchedInsertAll()\
  .execute()

                                                                                

# デルタテーブルを読み込む

In [7]:
df = spark.read.format("delta").load(delta_table_path)
df.createOrReplaceTempView(delta_table_name)
spark.conf.set('dq.val.delta_table_name', delta_table_name)
spark.sql(
  """
    SELECT
      id,
      name,
      short_name,
      kind,
      material,
      price,
      description,
      delete_flg
    FROM ${dq.val.delta_table_name}
    ORDER BY CAST(id AS BIGINT) ASC
  """
).show(100)

+---+--------------------+----------+-----------+--------+-----+-----------+----------+
| id|                name|short_name|       kind|material|price|description|delete_flg|
+---+--------------------+----------+-----------+--------+-----+-----------+----------+
|  1|Fantastic Granite...|     Shoes|      Small|  Cotton| 3067|    nothing|         0|
|  2|    Rustic Soft Fish|     Pants|   Handmade|   Steel| 7107|    nothing|         0|
|  3| Licensed Cotton Hat|  Sausages|  Fantastic|   Metal| 9050|    nothing|         0|
|  4|Luxurious Bronze ...|     Chair|Intelligent| Plastic| 4967|    nothing|         0|
|  5|Generic Wooden To...|     Mouse|      Small|   Metal| 4428|      still|         0|
|  6|Handcrafted Cotto...|  Computer|          1|       1|    1|     update|         0|
|  7|Handmade Wooden T...|     Chips|          1|       1|    1|     update|         0|
|  8| Licensed Metal Fish|  Computer|    Refined|  Rubber| 7437| delete_flg|         1|
|  9|Electronic Concre...|    Ch