In [6]:
import polars as pl

delta_path = "./delta/user"

In [7]:
# Create Delta Table from Parquet File
pl.read_parquet("./data/users.parquet").write_delta("./delta/users", mode="overwrite")

In [8]:
df = pl.read_delta("./delta/users")
df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice""",25,87.5
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
4,"""David""",41,85.9
5,"""Eve""",36,92.3


# Mode Append - Append New Row to Delta Table

> 새로 생성된 데이터 저장

#### 기존 방식
1. 기존 데이터를 모두 읽음
2. 새로운 데이터를 추가하여 전체 데이터를 다시 씀

→ 최종적으로 기존 Parquet1: ROW(5), New Parquet2: ROW(7)

#### Delta Lake 방식
1. 기존 데이터를 모두 읽음
2. 새로운 데이터만을 가진 Parquet 파일을 생성
3. Delta Log 갱신

→ 최종적으로 기존 Parquet1: ROW(5), New Parquet2: ROW(2)

In [9]:
df_new_row = pl.DataFrame({
    "name": ["Sumin", "Lee"],
    "user_id": [6, 7],
    "age": [29, 31],
    "score": [88.5, 92.0],
})

In [10]:
df_new_row.write_delta("./delta/users", mode="append")  # Append Mode

In [14]:
created_parquet = pl.read_parquet(
    "./delta/users/part-00000-d976ddfe-7619-4bd2-b72b-3e0ae9cc7db8-c000.snappy.parquet")

In [16]:
created_parquet

user_id,name,age,score
i64,str,i64,f64
6,"""Sumin""",29,88.5
7,"""Lee""",31,92.0


In [17]:
result_df = pl.read_delta("./delta/users")
result_df

user_id,name,age,score
i64,str,i64,f64
6,"""Sumin""",29,88.5
7,"""Lee""",31,92.0
1,"""Alice""",25,87.5
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
4,"""David""",41,85.9
5,"""Eve""",36,92.3


# Mode Overwrite - Overwrite Delta Table with Filtered Data

> 기존 데이터를 모두 덮어쓰는 방식

Deletion Vector가 없는 Copy-on-Write 방식일 때 데이터를 삭제하거나 수정하게 되면 해당 방식을 사용할 수 있음

#### 동작 방식
1. 기존 데이터를 모두 읽음
2. 필터링된 데이터로 새로운 Apache Arrow (in memory) 생성 (age > 30)
3. 새로운 Parquet 파일로 덮어쓰기

#### READ DELTA

```
BEFORE OVERWRITE
READ VERSION 0
READ VERSION 1 <-- append 2 rows

RESULT: 7 ROWS

---------------------------

AFTER OVERWRITE
SKIP VERSION 0
SKIP VERSION 1
READ VERSION 2 <-- overwrite with filtered rows

RESULT: ONLY ROWS WHERE age > 30
```

In [19]:
df = pl.read_delta("delta/users", version=0)
df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice""",25,87.5
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
4,"""David""",41,85.9
5,"""Eve""",36,92.3


In [20]:
filtered_df = df.filter(pl.col("age") > 30)
filtered_df

user_id,name,age,score
i64,str,i64,f64
2,"""Bob""",32,91.2
4,"""David""",41,85.9
5,"""Eve""",36,92.3


In [21]:
filtered_df.write_delta("delta/users", mode="overwrite")

In [22]:
# 새로운 Parquet 파일로 버저닝이 덮어씀
result_df = pl.read_delta("delta/users")
result_df

user_id,name,age,score
i64,str,i64,f64
2,"""Bob""",32,91.2
4,"""David""",41,85.9
5,"""Eve""",36,92.3


# Mode Merge - Overwrite Delta Table with Filtered Data

> 해당 동작은 source(df), target(files) 간의 merge 작업을 수행

Overwrite와 동일한 동작이라고 생각할 수 있지만,
만약 파티셔닝이 되어 있다면 특정 파티션만 새로운 버전을 생성할 수 있다는 장점이 있음

ex.) part_00000, part_00001, part_00002 파티션이 있을 때,
part_00001 파티션에 대해서만 새로운 버전을 생성하고, part_00000, part_00002 파티션은 기존 파일을 참조하게 만들 수 있음

#### MERGE ACTIONS

```python
df.write_delta(
    "./delta/users", # target
    mode="merge",
    delta_merge_options={
        "predicate": "target.user_id = source.user_id",
        "source_alias": "source",
        "target_alias": "target"
    }
).action().execute()
```

source: 새로운 데이터 (DataFrame)
target: 기존 데이터 (Delta Table)

- when_not_matched_by_source_delete: source에 없는 target row 삭제
- when_not_matched_by_source_update: source에 없는 target row 업데이트
- when_matched_update_all: 일치하는 row 모두 업데이트 (이 조건이 없다면 일치하는 로우는 pass될 수 있음)
- when_not_matched_insert: predicate 조건에 맞지 않는 row 삽입
- when_matched_delete: 일치하는 row 삭제

In [39]:
import polars as pl


def clean_df():
    """
    Clean Delta Table by Overwriting with Original Parquet Data
    :return:
    """
    pl.read_parquet("./data/users.parquet").write_delta("./delta/users", mode="overwrite")
    return pl.read_delta("./delta/users")

#### ACTION - WHEN MATCHED UPDATE ALL

In [50]:
df = clean_df()

In [51]:
new_users_df = pl.DataFrame({
    "user_id": [1, 10, 4],
    "name": ["Alice2", "Bob2", "Charlie2"],
    "age": [30, 25, 40]
})

new_users_df

user_id,name,age
i64,str,i64
1,"""Alice2""",30
10,"""Bob2""",25
4,"""Charlie2""",40


In [52]:
df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice""",25,87.5
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
4,"""David""",41,85.9
5,"""Eve""",36,92.3


In [53]:
new_users_df.write_delta(
    "./delta/users",
    mode="merge",
    delta_merge_options={
        "predicate": "target.user_id = source.user_id",
        "source_alias": "source",
        "target_alias": "target"
    }
).when_matched_update_all().execute()  # 일치하는 값만 업데이트, 기존 테이블의 불일치하는 값들은 보존되지만 source에 없는 값들은 반영되지 않음

{'num_source_rows': 3,
 'num_target_rows_inserted': 0,
 'num_target_rows_updated': 2,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 3,
 'num_output_rows': 5,
 'num_target_files_scanned': 1,
 'num_target_files_skipped_during_scan': 0,
 'num_target_files_added': 1,
 'num_target_files_removed': 1,
 'execution_time_ms': 8,
 'scan_time_ms': 2,
 'rewrite_time_ms': 0}

In [54]:
result_df = pl.read_delta("./delta/users")
result_df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice2""",30,87.5
4,"""Charlie2""",40,85.9
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
5,"""Eve""",36,92.3


ACTION - WHEN_NOT_MATCHED_INSERT

In [55]:
df = clean_df()

new_users_df = pl.DataFrame({
    "user_id": [1, 10, 4],
    "name": ["Alice2", "Bob2", "Charlie2"],
    "age": [30, 25, 40]
})

new_users_df


user_id,name,age
i64,str,i64
1,"""Alice2""",30
10,"""Bob2""",25
4,"""Charlie2""",40


In [56]:
df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice""",25,87.5
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
4,"""David""",41,85.9
5,"""Eve""",36,92.3


In [60]:
new_users_df.write_delta(
    "./delta/users",
    mode="merge",
    delta_merge_options={
        "predicate": "target.user_id = source.user_id",
        "source_alias": "source",
        "target_alias": "target"
    }
).when_not_matched_insert_all().execute()  # 일치하지 않는 값 (user_id: 10) 삽입, 하지만 업데이트는 없기 때문이 (1, 4)의 이름은 바뀌지 않음

{'num_source_rows': 3,
 'num_target_rows_inserted': 0,
 'num_target_rows_updated': 0,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 0,
 'num_target_files_scanned': 2,
 'num_target_files_skipped_during_scan': 0,
 'num_target_files_added': 0,
 'num_target_files_removed': 0,
 'execution_time_ms': 8,
 'scan_time_ms': 1,
 'rewrite_time_ms': 0}

In [61]:
result_df = pl.read_delta("./delta/users")
result_df

user_id,name,age,score
i64,str,i64,f64
10,"""Bob2""",25,
1,"""Alice""",25,87.5
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
4,"""David""",41,85.9
5,"""Eve""",36,92.3


#### ACTION - WHEN_NOT_MATCHED_BY_SOURCE_UPDATE

In [62]:
df = clean_df()

new_users_df = pl.DataFrame({
    "user_id": [1, 10, 4],
    "name": ["Alice2", "Bob2", "Charlie2"],
    "age": [30, 25, 40]
})

new_users_df

user_id,name,age
i64,str,i64
1,"""Alice2""",30
10,"""Bob2""",25
4,"""Charlie2""",40


In [63]:
df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice""",25,87.5
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
4,"""David""",41,85.9
5,"""Eve""",36,92.3


In [66]:
new_users_df.write_delta(
    "./delta/users",
    mode="merge",
    delta_merge_options={
        "predicate": "target.user_id = source.user_id",
        "source_alias": "source",
        "target_alias": "target"
    }
).when_not_matched_by_source_update(
    updates={
        "name": "'Unknown'",  # String 문자열 사용 시 ''로 감싸야 함
    }
).execute()  # 아이디가 일치하지 않는 user_id = (2, 3, 5)인 사용자 이름을 'Unknown'으로 업데이트 위와 같이 user_id = 10에 대한 처리가 없기 때문에 유실이 됨

{'num_source_rows': 3,
 'num_target_rows_inserted': 0,
 'num_target_rows_updated': 3,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 2,
 'num_output_rows': 5,
 'num_target_files_scanned': 1,
 'num_target_files_skipped_during_scan': 0,
 'num_target_files_added': 1,
 'num_target_files_removed': 1,
 'execution_time_ms': 8,
 'scan_time_ms': 1,
 'rewrite_time_ms': 0}

In [67]:
result_df = pl.read_delta("./delta/users")
result_df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice""",25,87.5
4,"""David""",41,85.9
2,"""Unknown""",32,91.2
3,"""Unknown""",29,78.4
5,"""Unknown""",36,92.3


#### Feature - Source에 없는 데이터는 UNKNOWN으로 업데이트하고, Source에만 있는 데이터는 UPSERT 하기 (1, 4이름 변경 및 10 추가)

In [72]:
df = clean_df()

new_users_df = pl.DataFrame({
    "user_id": [1, 10, 4],
    "name": ["Alice2", "Bob2", "Charlie2"],
    "age": [30, 25, 40]
})

new_users_df

user_id,name,age
i64,str,i64
1,"""Alice2""",30
10,"""Bob2""",25
4,"""Charlie2""",40


In [73]:
df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice""",25,87.5
2,"""Bob""",32,91.2
3,"""Charlie""",29,78.4
4,"""David""",41,85.9
5,"""Eve""",36,92.3


In [74]:
(new_users_df.write_delta(
    "./delta/users",
    mode="merge",
    delta_merge_options={
        "predicate": "target.user_id = source.user_id",
        "source_alias": "source",
        "target_alias": "target"
    }
).when_not_matched_by_source_update(
    updates={
        "name": "'Unknown'",  # String 문자열 사용 시 ''로 감싸야 함
    }
).when_not_matched_insert_all()
 .when_matched_update_all().execute()
 )

{'num_source_rows': 3,
 'num_target_rows_inserted': 1,
 'num_target_rows_updated': 5,
 'num_target_rows_deleted': 0,
 'num_target_rows_copied': 0,
 'num_output_rows': 6,
 'num_target_files_scanned': 1,
 'num_target_files_skipped_during_scan': 0,
 'num_target_files_added': 1,
 'num_target_files_removed': 1,
 'execution_time_ms': 10,
 'scan_time_ms': 2,
 'rewrite_time_ms': 0}

In [75]:
result_df = pl.read_delta("./delta/users")
result_df

user_id,name,age,score
i64,str,i64,f64
1,"""Alice2""",30,87.5
4,"""Charlie2""",40,85.9
2,"""Unknown""",32,91.2
3,"""Unknown""",29,78.4
5,"""Unknown""",36,92.3
10,"""Bob2""",25,
