# End-to-End 시나리오 — 일별 배치부터 장애 복구까지

이 노트북에서는 지금까지 배운 모든 내용을 **실전 시나리오**로 종합 실습합니다.

### 시나리오

온라인 쇼핑몰의 주문 데이터 파이프라인:

1. **일별 배치 적재** — 매일 새로운 주문 데이터 INSERT
2. **상태 업데이트** — 배송 완료된 주문 UPDATE
3. **스키마 변경** — 새 컬럼 추가 (Schema Evolution)
4. **컴팩션** — Small File Problem 해결
5. **스냅샷 정리** — 유지보수 작업
6. **장애 복구** — 실수로 데이터 삭제 후 Time Travel로 복구

## 환경 설정

In [None]:
import sys
sys.path.append('..')

import time
from utils.spark_setup import create_spark_session
from utils.data_generator import generate_orders, to_spark_df
from utils.file_explorer import show_tree, snapshot_tree, diff_tree, count_files, total_size

In [None]:
spark = create_spark_session()

TABLE_NAME = "demo.lab.e2e_orders"
TABLE_PATH = "/home/jovyan/data/warehouse/lab/e2e_orders"

---
## Phase 1: 테이블 생성 + 일별 배치 적재

월별 Hidden Partitioning으로 테이블을 생성하고, 5일치 배치 데이터를 순차적으로 적재합니다.

In [None]:
spark.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")

spark.sql(f"""
CREATE TABLE {TABLE_NAME} (
    order_id BIGINT,
    customer_id BIGINT,
    product_name STRING,
    order_date DATE,
    amount DECIMAL(10,2),
    status STRING
) USING ICEBERG
PARTITIONED BY (months(order_date))
""")

print(f"테이블 생성 완료: {TABLE_NAME}")
print("파티셔닝: months(order_date)")

In [None]:
# Day 1~5: 일별 배치 적재 시뮬레이션
daily_batches = [
    {"day": 1, "start": "2024-01-01", "end": "2024-01-31", "records": 100},
    {"day": 2, "start": "2024-01-01", "end": "2024-01-31", "records": 80},
    {"day": 3, "start": "2024-02-01", "end": "2024-02-28", "records": 120},
    {"day": 4, "start": "2024-02-01", "end": "2024-02-28", "records": 90},
    {"day": 5, "start": "2024-03-01", "end": "2024-03-31", "records": 110},
]

offset = 1
for batch in daily_batches:
    orders = generate_orders(
        num_records=batch["records"],
        seed=batch["day"],
        start_date=batch["start"],
        end_date=batch["end"],
        id_offset=offset
    )
    df = to_spark_df(spark, orders)
    df.writeTo(TABLE_NAME).append()
    offset += batch["records"]
    print(f"Day {batch['day']}: {batch['records']}건 적재 ({batch['start']} ~ {batch['end']})")

total = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").collect()[0][0]
print(f"\n총 레코드: {total}건")
print(f"파일 수: {count_files(TABLE_PATH)}")

In [None]:
# 현재 상태 확인
print("Phase 1 완료 — 파일 구조:")
print("=" * 60)
show_tree(TABLE_PATH, max_depth=3)

print("\n스냅샷 히스토리:")
spark.sql(f"""
SELECT snapshot_id, committed_at, operation,
       summary['added-data-files'] as added_files,
       summary['added-records'] as added_records
FROM {TABLE_NAME}.snapshots
ORDER BY committed_at
""").show(truncate=False)

---
## Phase 2: 상태 업데이트 (COW)

배송이 완료된 주문의 상태를 `shipped`로, 일부 주문을 `cancelled`로 업데이트합니다.

In [None]:
before = snapshot_tree(TABLE_PATH)

# pending → shipped
spark.sql(f"""
UPDATE {TABLE_NAME}
SET status = 'shipped'
WHERE status = 'pending' AND order_date < '2024-02-01'
""")
shipped = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME} WHERE status = 'shipped'").collect()[0][0]
print(f"shipped 상태 업데이트 완료 (현재 shipped: {shipped}건)")

# 일부 cancelled
spark.sql(f"""
UPDATE {TABLE_NAME}
SET status = 'cancelled'
WHERE amount < 150 AND order_date >= '2024-03-01'
""")
cancelled = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME} WHERE status = 'cancelled'").collect()[0][0]
print(f"cancelled 상태 업데이트 완료 (현재 cancelled: {cancelled}건)")

after = snapshot_tree(TABLE_PATH)
print("\n파일 변경 사항 (COW 방식):")
diff_tree(before, after)

In [None]:
# 상태별 분포 확인
print("주문 상태 분포:")
spark.sql(f"""
SELECT status, COUNT(*) as count
FROM {TABLE_NAME}
GROUP BY status
ORDER BY count DESC
""").show()

---
## Phase 3: 스키마 변경 (Schema Evolution)

비즈니스 요구사항 변경: `region` 컬럼을 추가합니다.

In [None]:
# region 컬럼 추가
spark.sql(f"ALTER TABLE {TABLE_NAME} ADD COLUMN region STRING")
print("region 컬럼 추가 완료")

# 스키마 확인
spark.sql(f"DESCRIBE {TABLE_NAME}").show()

In [None]:
# 기존 데이터에 region 값 업데이트
spark.sql(f"""
UPDATE {TABLE_NAME}
SET region = CASE
    WHEN customer_id % 3 = 0 THEN 'Seoul'
    WHEN customer_id % 3 = 1 THEN 'Busan'
    ELSE 'Jeju'
END
""")

print("기존 데이터 region 업데이트 완료")
spark.sql(f"""
SELECT region, COUNT(*) as count
FROM {TABLE_NAME}
GROUP BY region
""").show()

In [None]:
# 새 데이터는 region 포함하여 삽입
from pyspark.sql.functions import lit, when, col

orders_new = generate_orders(num_records=100, seed=99, start_date="2024-03-15", end_date="2024-03-31", id_offset=501)
df_new = to_spark_df(spark, orders_new)
df_new = df_new.withColumn("region", 
    when(col("customer_id") % 3 == 0, lit("Seoul"))
    .when(col("customer_id") % 3 == 1, lit("Busan"))
    .otherwise(lit("Jeju"))
)
df_new.writeTo(TABLE_NAME).append()

total = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").collect()[0][0]
print(f"새 데이터 100건 추가 (region 포함). 총 레코드: {total}건")

---
## Phase 4: 컴팩션

여러 번의 INSERT/UPDATE로 파일이 많아졌습니다. Sort 전략으로 컴팩션합니다.

In [None]:
files_before = count_files(TABLE_PATH)
size_before = total_size(TABLE_PATH)
print(f"컴팩션 전: 파일 {files_before}개, 크기 {size_before:,} bytes")

# Sort 컴팩션 (order_date 기준)
spark.sql(f"""
CALL demo.system.rewrite_data_files(
    table => '{TABLE_NAME}',
    strategy => 'sort',
    sort_order => 'order_date ASC NULLS LAST'
)
""").show(truncate=False)

files_after = count_files(TABLE_PATH)
size_after = total_size(TABLE_PATH)
print(f"컴팩션 후: 파일 {files_after}개, 크기 {size_after:,} bytes")
print(f"파일 수 변화: {files_before} → {files_after}")

---
## Phase 5: 스냅샷 정리 (유지보수)

지금까지 생성된 스냅샷 중 최근 2개만 남기고 정리합니다.

In [None]:
# 현재 스냅샷 수 확인
snapshots = spark.sql(f"SELECT * FROM {TABLE_NAME}.snapshots").collect()
print(f"현재 스냅샷 수: {len(snapshots)}")

# 최근 2개만 유지
spark.sql(f"""
CALL demo.system.expire_snapshots(
    table => '{TABLE_NAME}',
    retain_last => 2
)
""").show(truncate=False)

snapshots_after = spark.sql(f"SELECT * FROM {TABLE_NAME}.snapshots").collect()
print(f"정리 후 스냅샷 수: {len(snapshots_after)}")

In [None]:
# Manifest 재작성
spark.sql(f"""
CALL demo.system.rewrite_manifests(
    table => '{TABLE_NAME}'
)
""").show(truncate=False)

print("유지보수 완료: Expire Snapshots + Rewrite Manifests")

In [None]:
# 유지보수 후 최종 상태
print("Phase 5 완료 — 유지보수 후 테이블 상태:")
print("=" * 60)
print(f"레코드 수: {spark.sql(f'SELECT COUNT(*) FROM {TABLE_NAME}').collect()[0][0]}")
print(f"파일 수: {count_files(TABLE_PATH)}")
print(f"스냅샷 수: {len(spark.sql(f'SELECT * FROM {TABLE_NAME}.snapshots').collect())}")

print("\n파일 구조:")
show_tree(TABLE_PATH, max_depth=3)

---
## Phase 6: 장애 복구 (Time Travel)

### 시나리오: 실수로 데이터 대량 삭제

운영자가 실수로 잘못된 DELETE 쿼리를 실행하여 데이터가 삭제되었습니다. Time Travel로 복구합니다.

In [None]:
# 삭제 전 레코드 수 기록
count_before_accident = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").collect()[0][0]
print(f"삭제 전 레코드 수: {count_before_accident}")

# 현재 스냅샷 ID 기록 (복구 지점)
safe_snapshot = spark.sql(f"""
SELECT snapshot_id FROM {TABLE_NAME}.snapshots
ORDER BY committed_at DESC LIMIT 1
""").collect()[0][0]
print(f"안전한 스냅샷 ID: {safe_snapshot}")

In [None]:
# 사고 발생: 실수로 Seoul 지역 데이터 전체 삭제!
print("[사고 발생] 실수로 Seoul 지역 데이터 전체 삭제!")
spark.sql(f"DELETE FROM {TABLE_NAME} WHERE region = 'Seoul'")

count_after_accident = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").collect()[0][0]
deleted = count_before_accident - count_after_accident
print(f"삭제 후 레코드 수: {count_after_accident} ({deleted}건 삭제됨!)")

print("\n현재 region 분포:")
spark.sql(f"SELECT region, COUNT(*) FROM {TABLE_NAME} GROUP BY region").show()

In [None]:
# 복구 방법 1: Time Travel로 삭제 전 데이터 확인
print(f"[복구] 스냅샷 {safe_snapshot}의 데이터 확인:")
safe_count = spark.sql(f"""
SELECT COUNT(*) FROM {TABLE_NAME}
VERSION AS OF {safe_snapshot}
""").collect()[0][0]
print(f"스냅샷 {safe_snapshot} 레코드 수: {safe_count}")

print("\n스냅샷의 region 분포:")
spark.sql(f"""
SELECT region, COUNT(*) as count
FROM {TABLE_NAME} VERSION AS OF {safe_snapshot}
GROUP BY region
""").show()

In [None]:
# 복구 방법 2: Rollback으로 테이블 상태를 이전 스냅샷으로 되돌리기
print("[복구 실행] Rollback to safe snapshot...")
spark.sql(f"""
CALL demo.system.rollback_to_snapshot(
    table => '{TABLE_NAME}',
    snapshot_id => {safe_snapshot}
)
""")

count_after_recovery = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").collect()[0][0]
print(f"\n복구 완료!")
print(f"레코드 수: {count_before_accident} → {count_after_accident} (사고) → {count_after_recovery} (복구)")

print("\n복구 후 region 분포:")
spark.sql(f"SELECT region, COUNT(*) FROM {TABLE_NAME} GROUP BY region").show()

### 관찰 포인트 — 장애 복구

- `VERSION AS OF`로 삭제 전 데이터를 **먼저 확인**한 후 복구할 수 있었습니다
- `rollback_to_snapshot`으로 테이블 상태를 **원래대로 되돌렸습니다**
- Rollback은 데이터를 복사하지 않고 **메타데이터 포인터만 변경**하므로 즉시 완료됩니다
- 단, `expire_snapshots`로 삭제된 스냅샷으로는 복구할 수 없으므로 **보존 기간 설정이 중요**합니다

---
## 전체 파이프라인 요약

```
Phase 1: 일별 배치 적재        → INSERT (append)
Phase 2: 상태 업데이트          → UPDATE (COW)
Phase 3: 스키마 변경           → ALTER TABLE ADD COLUMN
Phase 4: 컴팩션               → rewrite_data_files (sort)
Phase 5: 스냅샷 정리           → expire_snapshots + rewrite_manifests
Phase 6: 장애 복구             → VERSION AS OF + rollback_to_snapshot
```

### 운영 체크리스트

| 항목 | 주기 | 명령 |
|------|------|------|
| 데이터 적재 | 매일 | `df.writeTo(table).append()` |
| 컴팩션 | 매일 | `rewrite_data_files(strategy => 'sort')` |
| 스냅샷 만료 | 매일~주간 | `expire_snapshots(retain_last => N)` |
| 고아 파일 정리 | 주간~월간 | `remove_orphan_files(dry_run => true)` |
| Manifest 재작성 | 주간 | `rewrite_manifests()` |
| 파티션 모니터링 | 주간 | `SELECT * FROM table.partitions` |
| 파일 크기 모니터링 | 주간 | `SELECT * FROM table.files` |

In [None]:
# 최종 테이블 상태
print("=" * 60)
print("최종 테이블 상태")
print("=" * 60)
print(f"레코드 수: {spark.sql(f'SELECT COUNT(*) FROM {TABLE_NAME}').collect()[0][0]}")
print(f"파일 수: {count_files(TABLE_PATH)}")
print(f"총 크기: {total_size(TABLE_PATH):,} bytes")
print(f"스냅샷 수: {len(spark.sql(f'SELECT * FROM {TABLE_NAME}.snapshots').collect())}")

print("\n스키마:")
spark.sql(f"DESCRIBE {TABLE_NAME}").show()

print("파티션:")
spark.sql(f"SELECT * FROM {TABLE_NAME}.partitions").show(truncate=False)

In [None]:
spark.stop()
print("Spark 세션 종료")
print("\nEnd-to-End 시나리오 실습 완료!")