In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from service.utils.spark import get_spark_session
# spark = SparkSession.builder.getOrCreate()
spark = get_spark_session(dev=True)
test_namespace = 'warehousedev.gold.test'
spark.conf.get('spark.sql.catalog.warehousedev.s3.region')
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {test_namespace}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/20 15:53:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/20 15:53:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


DataFrame[]

In [2]:
def write_iceberg(df, full_table_identifier):
    writer = \
        df.writeTo(full_table_identifier)

    if not spark.catalog.tableExists(full_table_identifier):
        writer.create()
    else:
        writer.overwritePartitions()

In [3]:
# silver
order_status_df = spark.read.csv("s3a://warehousedev/bronze/tsv/order_status.tsv", header=True, sep='\t')

delivered_customer_order_id = order_status_df.filter(order_status_df.status == 'delivered_customer').select(['order_id'])
delivered_customer_order_df = order_status_df.join(delivered_customer_order_id, on='order_id', how='inner')
status_list = ['purchase', 'delivered_customer']
delivered_customer_order_df = delivered_customer_order_df.filter(F.col('status').isin(status_list))

pivoted_df = delivered_customer_order_df.groupBy('order_id') \
    .pivot('status', ['purchase', 'delivered_customer']) \
    .agg(F.first('timestamp'))

delivered_order_df = pivoted_df.withColumnsRenamed({
    'purchase': 'purchase_date',
    'delivered_customer': 'delivery_date'
})
    
ed_df = spark.read.csv("s3a://warehousedev/bronze/tsv/estimated_delivery_date.tsv", header=True, sep='\t')
delivered_order_df = delivered_order_df.join(ed_df, on='order_id', how='inner')
payment_df = spark.read.csv("s3a://warehousedev/bronze/tsv/payment.tsv", header=True, sep='\t')
payment_info_df = payment_df.select(['order_id', 'customer_id']).dropDuplicates()

# payment와 order_status에 모두 있는 order_id만 남김
clean_order_df = delivered_order_df.join(payment_info_df, on='order_id', how='inner')

format_string1 = 'yyyy-MM-dd HH:mm:ss.SSSSSS'
format_string2 = 'yyyy-MM-dd HH:mm:ss'
clean_order_df = clean_order_df.withColumns({
    'purchase_date': F.to_timestamp(F.col('purchase_date'), format_string1),
    'delivery_date': F.to_timestamp(F.col('delivery_date'), format_string1),
    'estimated_delivery_date': F.to_timestamp(F.col('estimated_delivery_date'), format_string2)
    })
clean_order_df = clean_order_df.orderBy(F.col('purchase_date').asc())
# clean_order_df.show(truncate=False)
# write_iceberg(clean_order_df, f"{test_namespace}.clean_order")

25/09/20 15:53:37 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [4]:
# silver
order_item_df = spark.read.csv("s3a://warehousedev/bronze/tsv/order_item.tsv", header=True, sep='\t')
product_df = spark.read.csv("s3a://warehousedev/bronze/tsv/product.tsv", header=True, sep='\t')

taget_category = 'health_beauty'
taget_category_product = product_df.filter(product_df.category == taget_category).select(['product_id', 'category'])
taget_category_order = taget_category_product.join(order_item_df, on='product_id', how='inner')
taget_category_order = taget_category_order.sort('product_id')

# 하나의 order에 여러 item이 있을 수 있으므로, order_item_id는 남겨둔다.
# `total_price` 는 상품가와 배송비를 합산한 가격
taget_category_order = taget_category_order.withColumn('total_price', F.round(F.col('price') + F.col('freight_value'), 4))
taget_category_order = taget_category_order.drop('shipping_limit_date', 'price', 'freight_value')
clean_category_order_df = taget_category_order.join(clean_order_df.select(['order_id']), on='order_id', how='inner')
# clean_category_order_df.show(truncate=False)
# write_iceberg(clean_category_order_df, f"{test_namespace}.clean_category_order")

In [5]:
target_product_order_info = clean_order_df.join(clean_category_order_df, on='order_id', how='inner')

In [6]:
# gold
sale_stats = clean_category_order_df.groupBy('product_id').agg(
    F.count('order_id').alias('order_count'),
    F.round(F.sum(F.col('total_price')), 4).alias('total_sales')
    ).orderBy(F.col('order_count').desc())

sale_stats = sale_stats.withColumn('mean_sale', F.round(F.col('total_sales') / F.col('order_count'), 4))
# sale_stats.show(truncate=False)

In [7]:
# 1. 기준점(Threshold) 계산
# percentile_approx 함수를 사용하여 분위수 계산
order_count_threshold = sale_stats.agg(
    F.expr("percentile_approx(order_count, 0.75)")
).collect()[0][0]

median_avg_price = sale_stats.agg(
    F.expr("percentile_approx(mean_sale, 0.5)")
).collect()[0][0]

# print(order_count_threshold)
# print(median_avg_price)


# 2. 'group' 컬럼 추가
# when/otherwise와 col 함수를 사용하여 조건에 따라 그룹을 분류합니다.
classified_sale_stats = sale_stats.withColumn("segment",
    F.when((F.col("order_count") >= order_count_threshold) & (F.col("mean_sale") >= median_avg_price), "Star Products")
    .when((F.col("order_count") >= order_count_threshold) & (F.col("mean_sale") < median_avg_price), "Volume Drivers")
    .when((F.col("order_count") < order_count_threshold) & (F.col("mean_sale") >= median_avg_price), "Niche Gems")
    .otherwise("Question Marks")
)

# classified_sale_stats.show(truncate=False)

                                                                                

In [8]:
target_product_order_info_with_segment = target_product_order_info.join(classified_sale_stats.select('product_id', 'segment'), on='product_id', how='inner')
# target_product_order_info_with_segment.show()

spark.sql(f"DROP TABLE IF EXISTS {test_namespace}.target_product_order_info_with_segment")
write_iceberg(target_product_order_info_with_segment, f"{test_namespace}.target_product_order_info_with_segment")

                                                                                

In [9]:
## TODO: 정리

In [10]:
# silver: review metadata
review_df = spark.read.csv("s3a://warehousedev/bronze/tsv/review.tsv", header=True, sep='\t')

review_metadata_df = review_df.drop('review_comment_title', 'review_comment_message')
clean_review_metadata_df = review_metadata_df.join(clean_category_order_df.select('order_id', 'product_id'), on=['order_id'], how='inner')
clean_review_metadata_df = clean_review_metadata_df.orderBy('product_id')

format_string = "yyyy-MM-dd HH:mm:ss"
clean_review_metadata_df = clean_review_metadata_df \
    .withColumn('review_creation_date', F.to_timestamp(F.col('review_creation_date'), format_string)) \
    .withColumn('review_answer_timestamp', F.to_timestamp(F.col('review_answer_timestamp'), format_string))

clean_review_metadata_df = clean_review_metadata_df.withColumn('answer_lead_time', F.datediff(F.col('review_answer_timestamp'), F.col('review_creation_date')) )
clean_review_metadata_df = clean_review_metadata_df.drop('review_creation_date', 'review_answer_timestamp')
clean_review_metadata_df.show()
write_iceberg(clean_review_metadata_df, f"{test_namespace}.clean_review_metadata")

                                                                                

+--------------------+--------------------+------------+--------------------+----------------+
|            order_id|           review_id|review_score|          product_id|answer_lead_time|
+--------------------+--------------------+------------+--------------------+----------------+
|c01f1e6ceafe26dc7...|f8bdbfeda4ecce455...|           5|00210e41887c2a8ef...|               5|
|1fd6c29ecb9dd8b65...|d5ce4524953171740...|           5|00210e41887c2a8ef...|               2|
|2d8e71bf7d31a41d4...|dd8d78ad2be2e3477...|           5|00210e41887c2a8ef...|               1|
|eb24356203f63304c...|13493da42ce41d718...|           5|00210e41887c2a8ef...|               1|
|2d8e71bf7d31a41d4...|dd8d78ad2be2e3477...|           5|00210e41887c2a8ef...|               1|
|226975521c585d7bb...|bde3a6bc851b615b1...|           1|00210e41887c2a8ef...|               1|
|eb24356203f63304c...|13493da42ce41d718...|           5|00210e41887c2a8ef...|               1|
|4ff907acfe03d4b4e...|cfc901be886cfe062...|       

                                                                                

In [11]:
df = spark.read.table(f"{test_namespace}.target_product_order_info_with_segment")

In [12]:
delivery_stats = df \
    .withColumn(
        'delivery_lead_time',
        F.datediff(F.col('delivery_date'), F.col('purchase_date'))) \
    .withColumn(
        'is_late',
        F.when(F.col('delivery_date') <= F.col('estimated_delivery_date'), False)
        .otherwise(True)
    )
# delivery_stats.show()

In [13]:
delivery_stats_summary = delivery_stats.drop('purchase_date', 'delivery_date', 'estimated_delivery_date', 'total_price')
delivery_stats_summary.show()
write_iceberg(delivery_stats_summary, f"{test_namespace}.delivery_stats_summary")

+--------------------+--------------------+--------------------+-------------+-------------+--------------------+--------------+------------------+-------+
|          product_id|            order_id|         customer_id|     category|order_item_id|           seller_id|       segment|delivery_lead_time|is_late|
+--------------------+--------------------+--------------------+-------------+-------------+--------------------+--------------+------------------+-------+
|05f0fe07929d35be0...|d1c3d911ad830cab7...|eaba6c1c62884cc10...|health_beauty|            2|c70c1b0d8ca86052f...| Star Products|                 5|  false|
|05f0fe07929d35be0...|b41cc5a0872a7fd88...|ada8dea5e28ec3e93...|health_beauty|            1|c70c1b0d8ca86052f...| Star Products|                 8|  false|
|05f0fe07929d35be0...|362cd36939c1c4638...|32ca12567c820aa0b...|health_beauty|            1|c70c1b0d8ca86052f...| Star Products|                 7|  false|
|05f0fe07929d35be0...|b71ba668b12da5320...|57f36f673a04f9fa8...|

                                                                                

In [14]:
for file_name in ['geolocation', 'customer', 'seller']:
    print(file_name)
    tmp_df = spark.read.csv(f"s3a://warehousedev/bronze/tsv/{file_name}.tsv", header=True, sep='\t')
    write_iceberg(tmp_df, f"{test_namespace}.{file_name}")

geolocation
customer
seller


In [28]:
geolocation = spark.read.table(f"{test_namespace}.geolocation")
customer = spark.read.table(f"{test_namespace}.customer")
seller = spark.read.table(f"{test_namespace}.seller")
geolocation.show()


+--------+-------------------+-------------------+-----+---------+
|zip_code|                lat|                lng|state|     city|
+--------+-------------------+-------------------+-----+---------+
|    1001|-23.550189776551765|  -46.6340235559042|   SP|sao paulo|
|    1002| -23.54814573176355| -46.63497921074498|   SP|sao paulo|
|    1003| -23.54899372481316|-46.635731309975874|   SP|sao paulo|
|    1004|-23.549798842277006| -46.63475694378971|   SP|sao paulo|
|    1005|-23.549456199830747| -46.63673294803691|   SP|sao paulo|
|    1006| -23.55010181145287|-46.636136735174176|   SP|sao paulo|
|    1007|-23.550046202351666|-46.637251459637646|   SP|sao paulo|
|    1008| -23.54600174938346| -46.63588592135778|   SP|sao paulo|
|    1009|-23.546835208320868|-46.636490695312204|   SP|sao paulo|
|    1010| -23.54638943836927|-46.635226081509046|   SP|sao paulo|
|    1011| -23.54690864025111| -46.63558202636042|   SP|sao paulo|
|    1012|-23.547789733146864| -46.63485900505736|   SP|sao pa

In [None]:
from pyspark.sql.types import DecimalType
geolocation = geolocation \
    .withColumn('lat', F.col('lat').cast('float')) \
    .withColumn('lng', F.col('lng').cast('float'))
geolocation.show()
geolocation.printSchema()

+--------+----------+----------+-----+---------+
|zip_code|       lat|       lng|state|     city|
+--------+----------+----------+-----+---------+
|    1001| -23.55019| -46.63402|   SP|sao paulo|
|    1002|-23.548145| -46.63498|   SP|sao paulo|
|    1003|-23.548994| -46.63573|   SP|sao paulo|
|    1004|-23.549799|-46.634758|   SP|sao paulo|
|    1005|-23.549456|-46.636734|   SP|sao paulo|
|    1006|-23.550102|-46.636135|   SP|sao paulo|
|    1007|-23.550047|-46.637253|   SP|sao paulo|
|    1008|-23.546001|-46.635887|   SP|sao paulo|
|    1009|-23.546835| -46.63649|   SP|sao paulo|
|    1010|-23.546389|-46.635227|   SP|sao paulo|
|    1011| -23.54691|-46.635582|   SP|sao paulo|
|    1012| -23.54779|-46.634857|   SP|sao paulo|
|    1013|-23.547142|-46.634235|   SP|sao paulo|
|    1014|-23.545914|-46.633614|   SP|sao paulo|
|    1015|-23.547667|-46.631557|   SP|sao paulo|
|    1016|-23.548977| -46.63235|   SP|sao paulo|
|    1017|-23.549532|-46.631023|   SP|sao paulo|
|    1018|-23.550928

In [30]:
result_df = delivery_stats_summary.join(customer, delivery_stats_summary["customer_id"] == customer["customer_id"], "left") \
    .join(seller, delivery_stats_summary["seller_id"] == seller["seller_id"], "left") \
    .join(geolocation.alias("g1"), customer["zip_code"] == F.col("g1.zip_code"), "left") \
    .join(geolocation.alias("g2"), seller["zip_code"] == F.col("g2.zip_code"), "left")

# 최종적으로 필요한 컬럼 선택, 타입 변환(cast), 이름 변경(alias)
final_df = result_df.select(
    delivery_stats_summary["category"],
    delivery_stats_summary["product_id"],
    delivery_stats_summary["order_id"],
    delivery_stats_summary["order_item_id"],
    delivery_stats_summary["customer_id"],
    customer["zip_code"].alias("customer_zip_code"),
    F.col("g1.lat").cast(DecimalType(10, 8)).alias("customer_lat"),
    F.col("g1.lng").cast(DecimalType(10, 8)).alias("customer_lng"),
    delivery_stats_summary["seller_id"],
    seller["zip_code"].alias("seller_zip_code"),
    F.col("g2.lat").cast(DecimalType(10, 8)).alias("seller_lat"),
    F.col("g2.lng").cast(DecimalType(10, 8)).alias("seller_lng")
)

# 결과 확인
final_df.show()

NameError: name 'DecimalType' is not defined

In [None]:
delivery_stats_summary.join

+--------------------+--------------------+--------------------+-------------+-------------+--------------------+--------------+------------------+-------+
|          product_id|            order_id|         customer_id|     category|order_item_id|           seller_id|       segment|delivery_lead_time|is_late|
+--------------------+--------------------+--------------------+-------------+-------------+--------------------+--------------+------------------+-------+
|05f0fe07929d35be0...|d1c3d911ad830cab7...|eaba6c1c62884cc10...|health_beauty|            2|c70c1b0d8ca86052f...| Star Products|                 5|  false|
|05f0fe07929d35be0...|b41cc5a0872a7fd88...|ada8dea5e28ec3e93...|health_beauty|            1|c70c1b0d8ca86052f...| Star Products|                 8|  false|
|05f0fe07929d35be0...|362cd36939c1c4638...|32ca12567c820aa0b...|health_beauty|            1|c70c1b0d8ca86052f...| Star Products|                 7|  false|
|05f0fe07929d35be0...|b71ba668b12da5320...|57f36f673a04f9fa8...|