## Part 1. Spark 코드 실행


- final_df를 만드는 부분까지는 Cassandra 실습과 동일

In [4]:
from typing import Dict

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, IntegerType, StringType, StructType, StructField, ArrayType, TimestampType, \
    BooleanType, FloatType


def get_promotions_dict(promotions_df):
    promotions = promotions_df.rdd.collect()
    res = {}
    for promotion in promotions:
        prom_dict: dict = promotion.asDict()
        res[prom_dict["promotion_id"]] = prom_dict
    return res


ss: SparkSession = SparkSession.builder \
        .master("local") \
        .appName("cassandra ex") \
        .getOrCreate()

# 1. load input datas
input_root_path = "/home/iceberg/notebooks/input_data"

# 1.1 products
products_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("category_ids", ArrayType(IntegerType()), False),
])
products_df = ss.read.option("inferSchema", True) \
    .schema(products_schema).json(f"{input_root_path}/products.json") \
    .withColumnRenamed("name", "product_name")

# 1.2 items
items_schema = StructType([
    StructField("item_id", IntegerType(), False),
    StructField("product_id", IntegerType(), False),
    StructField("seller_id", IntegerType(), False),
    StructField("promotion_ids", ArrayType(IntegerType()), True),
    StructField("original_title", StringType(), False),
    StructField("search_tag", ArrayType(StringType()), True),
    StructField("price", IntegerType(), False),
    StructField("create_timestamp", TimestampType(), False),
    StructField("update_timestamp", TimestampType(), False),
    StructField("attrs", MapType(StringType(), StringType()), False),
    StructField("free_shipping", BooleanType(), False),
])
items_df = ss.read.option("inferSchema", True) \
    .schema(items_schema).json(f"{input_root_path}/items.json") \
    .withColumnRenamed("create_timestamp", "item_create_timestamp") \
    .withColumnRenamed("update_timestamp", "item_update_timestamp") \
    .withColumnRenamed("original_title", "original_item_title")

# 1.3 categories (product-level)
categories_schema = StructType([
    StructField("category_id", IntegerType(), False),
    StructField("category_name", StringType(), False),
])
categories_df = ss.read.option("inferSchema", True) \
    .schema(categories_schema).json(f"{input_root_path}/categories.json")

# 1.4 review (product-level)
reviews_schema = StructType([
    StructField("review_id", IntegerType()),
    StructField("product_id", IntegerType()),
    StructField("content", StringType()),
    StructField("score", IntegerType()),
])

reviews_df = ss.read.option("inferSchema", True) \
    .schema(reviews_schema).json(f"{input_root_path}/reviews.json") \
    .withColumnRenamed("score", "review_score") \
    .withColumnRenamed("content", "review_content")

# 1.5 promotions (item-level)
promotions_schema = StructType([
    StructField("promotion_id", IntegerType()),
    StructField("name", StringType()),
    StructField("discount_rate", FloatType()),
    StructField("start_date", StringType()),
    StructField("end_date", StringType()),
])

promotions_df = ss.read.option("inferSchema", True) \
    .schema(promotions_schema).json(f"{input_root_path}/promotions.json")

# 1.6 sellers (item-level)
sellers_schema = StructType([
    StructField("seller_id", IntegerType()),
    StructField("name", StringType())]),
sellers_df = ss.read.option("inferSchema", True).json(f"{input_root_path}/sellers.json") \
    .withColumnRenamed("name", "seller_name")

# 2. transformation

# 2.1 크기가 작은 dataframe들은 dictionary로 변환 (전제 조건 : category, promotion : < 10MB 로 매우 작은 데이터셋)
categories_dict: Dict = categories_df.rdd.collectAsMap()
promotions_dict: Dict = get_promotions_dict(promotions_df)


def get_categories(category_ids: list):
    return {cid: categories_dict[cid] for cid in category_ids}


# 2.2 product levels

reviews_score_df = reviews_df \
    .groupby(F.col("product_id")) \
    .agg(F.mean(F.col("review_score")).alias("review_mean_score"),
         F.count(F.col("review_id")).alias("review_count"))

get_categories_udf = udf(get_categories, MapType(IntegerType(), StringType()))
df1 = products_df \
    .withColumn("categories", get_categories_udf(F.col("category_ids"))) \
    .drop(F.col("category_ids")) \
    .join(reviews_score_df, on="product_id", how="left")


# 2.3 item levels
def get_promotions(promotion_ids: list):
    return {pid: promotions_dict[pid] for pid in promotion_ids}


get_promotions_udf = udf(get_promotions, MapType(IntegerType(), StringType()))

df2 = items_df.withColumn("promotions", get_promotions_udf(F.col("promotion_ids"))) \
    .drop(F.col("promotion_ids")) \
    .join(sellers_df, on="seller_id", how="left")
# .withColumn("id", F.concat_ws("-", F.col("product_id"), F.col("item_id")))
df2.show()

# 2.4 join product levels and item levels

final_df = df1.join(df2, on="product_id", how="inner")

sorted_columns = sorted(final_df.columns)
final_df = final_df.select(sorted_columns)

final_df.show()
final_df.printSchema()

## Part 2 Iceberg table 생성

#### CREATE

In [11]:
%%sql

# database 생성
CREATE DATABASE IF NOT EXISTS fc_catalogs;

In [12]:
%%sql

DROP TABLE IF EXISTS fc_catalogs.central_catalog;

In [13]:
# Spark DataFrame의 schema 대로 iceberg table을 생성
# DDL 문을 이용해 iceberg table을 생성하는 것도 가능.
final_df.write.saveAsTable("fc_catalogs.central_catalog")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
                                                                                

In [14]:
%%sql

DESCRIBE EXTENDED fc_catalogs.central_catalog;

col_name,data_type,comment
attrs,"map<string,string>",
categories,"map<int,string>",
free_shipping,boolean,
item_create_timestamp,timestamp,
item_id,int,
item_update_timestamp,timestamp,
original_item_title,string,
price,int,
product_id,int,
product_name,string,


### spark queries

https://iceberg.apache.org/docs/latest/spark-queries/

#### READ

In [16]:
%%sql

SELECT * FROM fc_catalogs.central_catalog;

attrs,categories,free_shipping,item_create_timestamp,item_id,item_update_timestamp,original_item_title,price,product_id,product_name,promotions,review_count,review_mean_score,search_tag,seller_id,seller_name
"{'color': 'red', 'disk': '64GB', 'ram': '8GB'}","{2: '스마트폰', 3: '랩탑/노트북'}",True,2023-04-10 00:00:00,1,2023-05-06 11:43:42,Iphone14,1000000,1,Iphone14,"{1: '{end_date=2023-05-31 23:59:59, name=5월 한정 초특가 할인, promotion_id=1, discount_rate=0.30000001192092896, start_date=2023-05-01 00:00:00}'}",4,3.0,"['아이폰', '아이폰14', '아이폰 최신']",1,Apple 공식 리셀러
"{'color': 'blue', 'disk': '128GB', 'ram': '16GB'}","{2: '스마트폰', 3: '랩탑/노트북'}",True,2023-04-13 00:00:00,2,2023-05-06 11:43:42,Iphone14 Pro,1300000,1,Iphone14,"{1: '{end_date=2023-05-31 23:59:59, name=5월 한정 초특가 할인, promotion_id=1, discount_rate=0.30000001192092896, start_date=2023-05-01 00:00:00}'}",4,3.0,"['아이폰', '아이폰14pro', '최신아이폰']",1,Apple 공식 리셀러
"{'color': 'blue', 'disk': '128GB', 'ram': '16GB'}","{2: '스마트폰', 3: '랩탑/노트북'}",False,2023-03-18 00:50:00,3,2023-05-06 11:43:42,Iphone14 Pro,1300000,1,Iphone14,{},4,3.0,"['아이폰', '아이폰14pro', '아이폰 14', 'Iphone', 'Iphone14']",2,T공식대리점
"{'color': 'silver', 'disk': '512GB', 'ram': '16GB'}","{1: 'IT 전자기기', 3: '랩탑/노트북'}",True,2023-04-10 00:00:00,4,2023-05-06 11:43:42,M1 Macbook Pro,2000000,2,M1 Macbook Pro,"{3: '{end_date=2022-11-14 00:00:00, name=블랙프라이데이 특가, promotion_id=3, discount_rate=0.20000000298023224, start_date=2022-11-01 00:00:00}'}",3,3.6666666666666665,"['맥북', '맥북프로', 'M1맥북', 'Mackbook']",1,Apple 공식 리셀러
"{'color': 'space grey', 'disk': '512GB', 'ram': '24GB'}","{1: 'IT 전자기기', 3: '랩탑/노트북'}",True,2023-04-10 00:00:00,5,2023-05-06 11:43:42,M1 Macbook Pro,2500000,2,M1 Macbook Pro,"{2: '{end_date=2023-03-10 23:59:59, name=신학기 프로모션, promotion_id=2, discount_rate=0.15000000596046448, start_date=2023-03-01 00:00:00}'}",3,3.6666666666666665,"['Mackbook', '맥북 프로', '맥북', '맥', 'MAC']",1,Apple 공식 리셀러


In [25]:
%%sql
SELECT item_id, count(*) 
FROM fc_catalogs.central_catalog 
GROUP BY item_id;

item_id,count(1)
1,1
2,1
3,1
4,1
5,1


In [26]:
%%sql
SELECT * FROM fc_catalogs.central_catalog WHERE price > 1300000;

attrs,categories,free_shipping,item_create_timestamp,item_id,item_update_timestamp,original_item_title,price,product_id,product_name,promotions,review_count,review_mean_score,search_tag,seller_id,seller_name
"{'color': 'silver', 'disk': '512GB', 'ram': '16GB'}","{1: 'IT 전자기기', 3: '랩탑/노트북'}",True,2023-04-10 00:00:00,4,2023-05-06 11:43:42,M1 Macbook Pro,2000000,2,M1 Macbook Pro,"{3: '{end_date=2022-11-14 00:00:00, name=블랙프라이데이 특가, promotion_id=3, discount_rate=0.20000000298023224, start_date=2022-11-01 00:00:00}'}",3,3.6666666666666665,"['맥북', '맥북프로', 'M1맥북', 'Mackbook']",1,Apple 공식 리셀러
"{'color': 'space grey', 'disk': '512GB', 'ram': '24GB'}","{1: 'IT 전자기기', 3: '랩탑/노트북'}",True,2023-04-10 00:00:00,5,2023-05-06 11:43:42,M1 Macbook Pro,2500000,2,M1 Macbook Pro,"{2: '{end_date=2023-03-10 23:59:59, name=신학기 프로모션, promotion_id=2, discount_rate=0.15000000596046448, start_date=2023-03-01 00:00:00}'}",3,3.6666666666666665,"['Mackbook', '맥북 프로', '맥북', '맥', 'MAC']",1,Apple 공식 리셀러


#### INSERT

https://iceberg.apache.org/docs/latest/spark-writes/#insert-into

### Partitioning

- iceberg table의 Partitioning은 기존 데이터, 새로 기록되는 데이터 모두 적용이 가능.
- 이 때 Query Plan이 분기되어, Partitioning 방법이 바뀌기 전에 기록된 데이터에는 이전 Partitioning 방법에 기초해 쿼리하고, 바뀐 후에는 바뀐 방법으로 쿼리.
- WHERE 절의 특정 구문은 자동으로 Partition filter로 변경되어 필요한 파일만 읽는 것이 가능 (Hidden Partitioning)

In [29]:
%%sql
ALTER TABLE fc_catalogs.central_catalog
ADD PARTITION FIELD product_id