# Upsert Data Using the MERGE Operation

> 기 존재하는 레코드에 대해서는 업데이트를 수행하고, 존재하지 않는 경우 인서트를 수행

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON

from delta import *

# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]
warehouse_dir = f"{work_dir}/spark-warehouse"

# Create spark session with hive enabled
builder = (
    SparkSession
    .builder
    .appName("pyspark-notebook")
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.catalogImplementation", "hive")
    .config("spark.sql.warehouse.dir", warehouse_dir)
    .enableHiveSupport()
)

In [2]:
# 델타 레이크 생성시에 반드시 `configure_spark_with_delta_pip` 구성을 통해 실행되어야 정상적인 델타 의존성이 로딩됩니다
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
spark

In [4]:
def sql(queries, num_rows = 20):
    for query in queries.split(";"):
        spark.sql(query).show(num_rows, truncate=False)

def ls(command):
    !ls -al {command}

def cat(filename):
    !cat {filename}

def grep(keyword, filename):
    !grep -i {keyword} {filename}

def grep_and_json(keyword, filename):
    !grep {keyword} {filename} | python -m json.tool

def grep_sed_json(keyword, lineno, filename):
    !grep {keyword} {filename} | sed -n {lineno}p | python -m json.tool

In [5]:
sql("show databases ; use taxidb ; show tables")

+---------+
|namespace|
+---------+
|default  |
|taxidb   |
+---------+

++
||
++
++

+---------+------------------+-----------+
|namespace|tableName         |isTemporary|
+---------+------------------+-----------+
|taxidb   |greentaxis        |false      |
|taxidb   |yellowtaxis       |false      |
|taxidb   |yellowtaxis_append|false      |
+---------+------------------+-----------+



In [11]:
sql("select count(1) from greentaxis")

+--------+
|count(1)|
+--------+
|450625  |
+--------+



In [12]:
ls("/home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/*.json")

-rwxrwxrwx 1 jovyan 1000  2346 Aug 28 05:07 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000000.json
-rwxrwxrwx 1 jovyan 1000 15695 Aug 28 05:08 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000001.json
-rwxrwxrwx 1 jovyan 1000  4163 Aug 28 07:47 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000002.json
-rwxrwxrwx 1 jovyan 1000  2332 Aug 28 08:26 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000003.json


In [13]:
sql("describe history greentaxis ; describe greentaxis", 30)

+-------+-----------------------+------+--------+---------------------------------+------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation                        |operationParameters                                                                                                     |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                                                                                                                                    |userMetadata|engineInfo                         |


In [14]:
sql("""
SELECT
    INPUT_FILE_NAME() as filename_record_is_located,
    VendorId,
    RatecodeID,
    DOLocationID,
    passenger_count
FROM
    taxidb.greentaxis
WHERE
    VendorId = 2 and RatecodeID = 6
""")

+-------------------------------------------------------------------------------------------------------------------------------+--------+----------+------------+---------------+
|filename_record_is_located                                                                                                     |VendorId|RatecodeID|DOLocationID|passenger_count|
+-------------------------------------------------------------------------------------------------------------------------------+--------+----------+------------+---------------+
|file:/home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/part-00000-b43f38af-57c8-415c-ada4-c17fb8195df6-c000.snappy.parquet|2       |6         |193         |10             |
|file:/home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/part-00006-cb453028-6940-422f-8622-596782a99a75-c000.snappy.parquet|2       |6         |55          |1              |
+--------------------------------------------------------------------------------------------------------

* 삭제했던 2건의 레코드는 time travel 통해서 읽어와서 하나의 데이터 프레임을 만들어서 인서트
* 업데이트 했던 1건의 레코드는 원래 값으로 돌리는 방식으로 업데이트

In [18]:
# 현재 버전의 델타/스파크에서는 SQL 수준에서 Time Travel 이 되지 않아 API 통해서 실행
version5 = spark.read.format("delta").table("taxidb.greentaxis")
version1 = spark.read.format("delta").option("versionAsOf", "1").table("taxidb.greentaxis")

In [21]:
c5 = version5.count()
c1 = version1.count()
print(c5, c1)

450625 450627


In [28]:
update5 = spark.read.format("delta").table("taxidb.greentaxis").where(expr("vendorid = 2 and ratecodeid = 6 and passenger_count = 10")).withColumn("passenger_count_mod", expr("passenger_count / 10").cast("int")).drop("passenger_count").withColumnRenamed("passenger_count_mod", "passenger_count")
update1 = spark.read.format("delta").option("versionAsOf", "1").table("taxidb.greentaxis").where(expr("vendorid = 1 and ratecodeid = 6"))
update = update5.union(update1)
update.printSchema()
update.show()

root
 |-- VendorId: integer (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- passenger_count: double (nullable = true)

+--------+--------------------+---------------------+------------------+----------+---------

In [54]:
sql("select VendorId, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge  from greentaxis")

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorId|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|2       |2019-12-19 17:12:23 |2019-12-19 17:19:32  |N                 |1         |181         |25          |1              |1.28         |7.0        |1.0  |0.5   

In [49]:
sql("select count(1) from greentaxis")

+--------+
|count(1)|
+--------+
|450625  |
+--------+



In [53]:
sql("select vendorid, ratecodeid, PULocationID, DOLocationID from greentaxis where ratecodeid = 6")

+--------+----------+------------+------------+
|vendorid|ratecodeid|PULocationID|DOLocationID|
+--------+----------+------------+------------+
|2       |6         |193         |193         |
|2       |6         |108         |55          |
+--------+----------+------------+------------+



> union 시에 컬럼의 순서가 다르면 타입이 다 꼬이는 문제가 생기므로 모든 컬럼의 순서와 타입을 맞추고 union 해야만 한다

In [62]:
df1 = spark.read.format("delta").table("taxidb.greentaxis").where(expr("vendorid = 2 and ratecodeid = 6 and passenger_count = 10")).withColumn("passenger_count_mod", expr("passenger_count / 10")).drop("passenger_count")
df1.printSchema()

df2 = spark.read.format("delta").option("versionAsOf", "1").table("taxidb.greentaxis").where(expr("vendorid = 1 and ratecodeid = 6")).withColumn("passenger_count_mod", col("passenger_count")).drop("passenger_count")
df2.printSchema()

df2 = df1.union(df2).withColumn("passenger_count", col("passenger_count_mod").cast("int")).drop("passenger_count_mod")
update = df2.selectExpr("VendorId", "lpep_pickup_datetime", "lpep_dropoff_datetime", "store_and_fwd_flag", "RatecodeID", "PULocationID", "DOLocationID", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "congestion_surcharge")
update.printSchema()
update.show()

root
 |-- VendorId: integer (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- passenger_count_mod: double (nullable = true)

root
 |-- VendorId: integer (nullable = true)
 |-- lpep_pickup_datetime: string (nullab

### The MERGE Statement
> 기존 데이터로부터 가져온 데이터를 활용하여 MERGE 작업을 수행합니다

In [63]:
target = "taxidb.greentaxis"
source = "green_update"
update.createOrReplaceTempView(source)
sql(f"select * from {source}")
sql(f"describe formatted {source}")

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorId|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|2       |2019-12-20 14:38:21 |2019-12-20 14:38:28  |N                 |6         |193         |193         |1              |0.0          |2.5        |0.0  |0.5   

In [64]:
# merge 반환 데이터프레임에 실행 결과가 포함되므로, 결과 dataframe 을 출력하는 습관을 가지자
sql(f"""
MERGE INTO {target} AS target USING {source} AS source
    ON target.VendorId = source.VendorId AND target.RatecodeID = source.RatecodeID AND target.PULocationID = source.PULocationID
WHEN MATCHED
    THEN UPDATE SET target.passenger_count = source.passenger_count
WHEN NOT MATCHED
    THEN INSERT (VendorId, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge)
    VALUES (VendorId, lpep_pickup_datetime, lpep_dropoff_datetime, store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count, trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee, improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge)
""")

++
||
++
++



In [68]:
ls("/home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/*.json")
# cat("/home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000004.json")
grep_sed_json("add", 1, "/home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000004.json")
grep_sed_json("add", 2, "/home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000004.json")

-rwxrwxrwx 1 jovyan 1000  2346 Aug 28 05:07 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000000.json
-rwxrwxrwx 1 jovyan 1000 15695 Aug 28 05:08 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000001.json
-rwxrwxrwx 1 jovyan 1000  4163 Aug 28 07:47 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000002.json
-rwxrwxrwx 1 jovyan 1000  2332 Aug 28 08:26 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000003.json
-rwxrwxrwx 1 jovyan 1000  4191 Aug 28 12:17 /home/jovyan/work/spark-warehouse/taxidb.db/greentaxis/_delta_log/00000000000000000004.json
{
    "add": {
        "path": "part-00000-af0a6542-7fa8-4be2-ab1d-f21c324d9669-c000.snappy.parquet",
        "partitionValues": {},
        "size": 429144,
        "modificationTime": 1724847433868,
        "dataChange": true,
        "stats": "{\"numRecords\":19720,\"minValues\":{\"VendorId\":1,\"lpep_pickup_datetime

In [69]:
sql("select count(1) from greentaxis")

+--------+
|count(1)|
+--------+
|450627  |
+--------+



In [71]:
# 삭제된 2개가 insert, 변경된 1개가 update
sql("select vendorid, ratecodeid, PULocationID, DOLocationID, passenger_count from greentaxis where ratecodeid = 6")

+--------+----------+------------+------------+---------------+
|vendorid|ratecodeid|PULocationID|DOLocationID|passenger_count|
+--------+----------+------------+------------+---------------+
|2       |6         |108         |55          |1              |
|2       |6         |193         |193         |1              |
|1       |6         |31          |31          |1              |
|1       |6         |243         |244         |1              |
+--------+----------+------------+------------+---------------+



> 만약에 join 에 맞지 않는 상황이 발생하면 어떻게 되는가? 예를 들어 N개 이상 JOIN 되는 경우는?

* yellowtaxis 테이블을 활용해서 merge 현상을 테스트 해보자

In [78]:
sql("select input_file_name(), * from yellowtaxis")

+-----------------------------------------------------------------------------------------------------------------+-------+--------+-------------------+-----------------------+----------------+--------------+---------+-------------------+--------------+------------+----------+-----------+-----------+----------+-----+------+---------+-----------+--------------------+
|input_file_name()                                                                                                |RideId |VendorId|PickupTime         |DropTime               |PickupLocationId|DropLocationId|CabNumber|DriverLicenseNumber|PassengerCount|TripDistance|RatecodeId|PaymentType|TotalAmount|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|
+-----------------------------------------------------------------------------------------------------------------+-------+--------+-------------------+-----------------------+----------------+--------------+---------+-------------------+--------------+---------

In [80]:
df = spark.sql("select 9999995 as RideId, 1 as VendorId, 5.5 PassengerCount")
df.printSchema()
df.createOrReplaceTempView("yellow_update")
sql("select * from yellow_update")

root
 |-- RideId: integer (nullable = false)
 |-- VendorId: integer (nullable = false)
 |-- PassengerCount: decimal(2,1) (nullable = false)

+-------+--------+--------------+
|RideId |VendorId|PassengerCount|
+-------+--------+--------------+
|9999995|1       |5.5           |
+-------+--------+--------------+



In [82]:
# source 테이블은 1개의 레코드이므로 insert 발생하지 않으므로 구현하지 않고, matching 되는 레코드는 2개이므로 결과의 PassengerCount 는?
target = "yellowtaxis"
source = "yellow_update"

spark.sql(f"""
MERGE INTO {target} AS target USING {source} AS source
    ON target.RideId = source.RideId AND target.VendorId = source.VendorId
WHEN MATCHED
    THEN UPDATE SET target.PassengerCount = source.PassengerCount
""")

In [86]:
# 결과는 2개가 매칭 되어서 2개가 업데이트 되었고, 더불어 5.5라는 double 값을 integer 에 업데이트 시에는 정수로 변경되어 저장된다
sql("select * from yellowtaxis")

+-------+--------+-------------------+-----------------------+----------------+--------------+---------+-------------------+--------------+------------+----------+-----------+-----------+----------+-----+------+---------+-----------+--------------------+
|RideId |VendorId|PickupTime         |DropTime               |PickupLocationId|DropLocationId|CabNumber|DriverLicenseNumber|PassengerCount|TripDistance|RatecodeId|PaymentType|TotalAmount|FareAmount|Extra|MtaTax|TipAmount|TollsAmount|ImprovementSurcharge|
+-------+--------+-------------------+-----------------------+----------------+--------------+---------+-------------------+--------------+------------+----------+-----------+-----------+----------+-----+------+---------+-----------+--------------------+
|9999995|1       |2019-11-01 09:00:00|2019-11-01 09:02:23.573|65              |71            |TAC304   |453987             |5             |4.5         |1         |1          |20.34      |15.0      |0.5  |0.4   |2.0      |2.0        |1.

In [92]:
sql("describe history yellowtaxis")
spark.read.format("delta").option("versionAsOf", 3).table("yellowtaxis").groupBy("passengercount").count()

+-------+-----------------------+------+--------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                                                                                                                 |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                  

passengercount,count
2,4


In [93]:
spark.read.format("delta").option("versionAsOf", 4).table("yellowtaxis").groupBy("passengercount").count()

passengercount,count
5,2
2,2


In [9]:
spark.sql("describe history taxidb.yellowtaxis").select("version", "operation", "operationParameters", "operationMetrics").show(truncate=False)

+-------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|version|operation   |operationParameters                                                                                                                                                 |operationMetrics                                                                                                                                                                                                                                                                                           |
+-------+------------+--

```json
// operationParameters
{
    predicate -> ((target.RideId = source.RideId) AND (target.VendorId = source.VendorId))
    , matchedPredicates -> [{"actionType":"update"}]
    , notMatchedPredicates -> []
}

// operationMetrics
{
    numTargetRowsCopied -> 0
    , numTargetRowsDeleted -> 0
    , numTargetFilesAdded -> 2
    , executionTimeMs -> 973
    , numTargetRowsInserted -> 0
    , scanTimeMs -> 434
    , numTargetRowsUpdated -> 2
    , numOutputRows -> 2
    , numTargetChangeFilesAdded -> 0
    , numSourceRows -> 1
    , numTargetFilesRemoved -> 2
    , rewriteTimeMs -> 466
}
```

In [12]:
# MERGE 연산에서 발생하는 outer join 이해

# a 테이블에 해당하는 데이터 생성
data_a = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
]

# b 테이블에 해당하는 데이터 생성
data_b = [
    (2, "Bob"),
    (3, "Charlie"),
    (4, "David")
]

# a와 b DataFrame 생성
df_a = spark.createDataFrame(data_a, ["aid", "aname"])
df_b = spark.createDataFrame(data_b, ["bid", "bname"])

# Temporary View로 등록하여 SQL 사용 가능하게 설정
df_a.createOrReplaceTempView("a")
df_b.createOrReplaceTempView("b")

In [18]:
# FULL OUTER JOIN을 사용한 SQL 쿼리
inner_join = spark.sql("""
    SELECT *
    FROM a
     JOIN b
    ON a.aid = b.bid
""")

# 결과 출력
inner_join.show()

# FULL OUTER JOIN을 사용한 SQL 쿼리
outer_join = spark.sql("""
    SELECT *
    FROM a
    FULL OUTER JOIN b
    ON a.aid = b.bid
    WHERE a.aid IS NULL OR b.bid IS NULL
""")

# 결과 출력
outer_join.show()

+---+-------+---+-------+
|aid|  aname|bid|  bname|
+---+-------+---+-------+
|  2|    Bob|  2|    Bob|
|  3|Charlie|  3|Charlie|
+---+-------+---+-------+

+----+-----+----+-----+
| aid|aname| bid|bname|
+----+-----+----+-----+
|   1|Alice|null| null|
|null| null|   4|David|
+----+-----+----+-----+



In [19]:
# a, b 모두 있는 데이터 - UPDATe
inner_join.show()

# a 에만 있는 데이터 - INSERT
outer_join.where("bid is null").show()

# b 에만 있는 데이터 - DELETE
outer_join.where("aid is null").show()

+---+-------+---+-------+
|aid|  aname|bid|  bname|
+---+-------+---+-------+
|  2|    Bob|  2|    Bob|
|  3|Charlie|  3|Charlie|
+---+-------+---+-------+

+---+-----+----+-----+
|aid|aname| bid|bname|
+---+-----+----+-----+
|  1|Alice|null| null|
+---+-----+----+-----+

+----+-----+---+-----+
| aid|aname|bid|bname|
+----+-----+---+-----+
|null| null|  4|David|
+----+-----+---+-----+

