In [None]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import os

aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
catalog = 'glue'
database = 'spark_drill'
spark_conf = SparkConf()

# https://github.com/apache/spark/blob/v3.5.1/pom.xml
# https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/3.3.4
spark_conf.setAll(
    [
        ('spark.master', 'local[*]'),
        ('spark.app.name', 'spark_app'),
        # aws
        ('spark.jars.packages', 'org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2'),
        # s3
        ('spark.hadoop.fs.s3a.access.key', aws_access_key_id),
        ('spark.hadoop.fs.s3a.secret.key', aws_secret_access_key),
        ('spark.hadoop.fs.s3a.endpoint', 's3.amazonaws.com'),
        ('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem'),
        # iceberg
        ('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions'),
        (f'spark.sql.catalog.{catalog}', 'org.apache.iceberg.spark.SparkCatalog'),
        (f'spark.sql.catalog.{catalog}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog'),
        (f'spark.sql.catalog.{catalog}.warehouse', 's3://de-spark-practice/tpc-h/iceberg_table/'),
        (f'spark.sqk.catalog.{catalog}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
    ]
)

spark = SparkSession.builder\
    .config(conf=spark_conf)\
    .getOrCreate()

In [3]:
def estimtae_df_size(df) -> float:
    df.cache()
    nrows = df.count()
    size_mb = spark._jvm.org.apache.spark.util.SizeEstimator.estimate(df._jdf)/ 1024**2
    df.unpersist()

    return size_mb

In [4]:
from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType

customer_schema = StructType([
    StructField(name='cust_key', dataType=StringType(), nullable=True),
    StructField(name='name', dataType=StringType(), nullable=True),
    StructField(name='address', dataType=StringType(), nullable=True),
    StructField(name='nation_key', dataType=StringType(), nullable=True),
    StructField(name='phone', dataType=StringType(), nullable=True),
    StructField(name='acct_bal', dataType=FloatType(), nullable=True),
    StructField(name='mkt_segment', dataType=StringType(), nullable=True),
    StructField(name='comment', dataType=StringType(), nullable=True),
])
customer = spark.read\
    .options(delimiter = '|',
             header = False)\
    .schema(customer_schema)\
    .csv('s3a://de-spark-practice/tpc-h/raw/customer.tbl')

24/06/14 16:09:49 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


24/06/14 16:09:50 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [5]:
customer_df_size_mb = estimtae_df_size(customer)
print('customer: ', round(customer_df_size_mb,2), 'MB')

                                                                                

customer:  28.8 MB


In [6]:
# unprocessed
customer.write\
    .format('iceberg')\
    .mode('overwrite')\
    .saveAsTable(f'{catalog}.{database}.customer_unprocessed')

                                                                                

customer table is small, can  coalesce into 1 single partition.

In [9]:
# single partition 
customer\
    .repartition(1)\
    .write\
    .format('iceberg')\
    .mode('overwrite')\
    .saveAsTable(f'{catalog}.{database}.customer_single_partition')

                                                                                

In [18]:
query = f'''
    SELECT 
        file_size_in_bytes/power(1024,2) as file_size_in_MB 
    FROM {catalog}.{database}.customer_single_partition.files;
'''
spark.sql(query).show()

+-----------------+
|  file_size_in_MB|
+-----------------+
|7.593967437744141|
+-----------------+



In [10]:
order_schema = StructType([
    StructField(name='order_key', dataType=StringType(), nullable=True),
    StructField(name='cust_key', dataType=StringType(), nullable=True),
    StructField(name='order_status', dataType=StringType(), nullable=True),
    StructField(name='total_price', dataType=FloatType(), nullable=True),
    StructField(name='order_date', dataType=TimestampType(), nullable=True),
    StructField(name='order_priority', dataType=StringType(), nullable=True),
    StructField(name='clerk', dataType=StringType(), nullable=True),
    StructField(name='ship_priority', dataType=StringType(), nullable=True),
    StructField(name='comment', dataType=StringType(), nullable=True),
])

orders = spark.read\
    .options(delimiter = '|',
             header = False)\
    .schema(order_schema)\
    .csv('s3a://de-spark-practice/tpc-h/raw/orders.tbl')

In [11]:
orders_df_size_mb = estimtae_df_size(orders)
print('orders: ', round(orders_df_size_mb,2), 'MB')



orders:  134.61 MB


                                                                                

In [13]:
# unprocessed
orders.write\
    .format('iceberg')\
    .mode('overwrite')\
    .saveAsTable(f'{catalog}.{database}.orders_unprocessed')

                                                                                

In [32]:
# sort by low cardinality column

query = f'''
    SELECT 'order_status' as col, count(distinct order_status) as cardinality from {catalog}.{database}.orders_unprocessed
    union all
    SELECT 'order_date' as col, count(distinct order_date) as cardinality from {catalog}.{database}.orders_unprocessed
    union all
    SELECT 'order_priority' as col, count(distinct order_priority) as cardinality from {catalog}.{database}.orders_unprocessed
    union all
    SELECT 'ship_priority' as col, count(distinct ship_priority) as cardinality from {catalog}.{database}.orders_unprocessed
'''
spark.sql(query).show()



+--------------+-----------+
|           col|cardinality|
+--------------+-----------+
|  order_status|          3|
|    order_date|       2406|
|order_priority|          5|
| ship_priority|          1|
+--------------+-----------+



                                                                                

In [30]:
spark.sql(query).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- HashAggregate(keys=[], functions=[count(distinct order_status#1467)])
   :  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=948]
   :     +- HashAggregate(keys=[], functions=[partial_count(distinct order_status#1467)])
   :        +- HashAggregate(keys=[order_status#1467], functions=[])
   :           +- Exchange hashpartitioning(order_status#1467, 200), ENSURE_REQUIREMENTS, [plan_id=944]
   :              +- HashAggregate(keys=[order_status#1467], functions=[])
   :                 +- BatchScan glue.spark_drill.orders_unprocessed[order_status#1467] glue.spark_drill.orders_unprocessed (branch=null) [filters=, groupedBy=] RuntimeFilters: []
   :- HashAggregate(keys=[], functions=[count(distinct order_date#1478)])
   :  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=954]
   :     +- HashAggregate(keys=[], functions=[partial_count(distinct order_date#1478)])
   :        +- HashAggregate(keys=[ord

Using **order_status** as an example.
- The table is not partitioned, Spark initialised 3 tasks to read the table, each task read in approximately the same number of records. ![alt text](order_status_task.jpg)
- A local aggregation on *order_status* is performed. This will give the distinct order_status locally.
- A shuffle happened to rearrange the records, records with same order_status will sit together in the same partition.
- AQE coalesces the number of partition to 1 after shuffling
- Another local aggregation on *order_status* is performed to remove duplicates
- The partial count counts the number of distinct records
- The last shuffle and aggregate is redundant since AQE has already coleasced the number of partition to 1.

In [47]:
# single partition, sort by 'order_status', 'order_priority', 'order_date'
orders.repartition(1)\
    .sortWithinPartitions('order_status', 'order_priority', 'order_date')\
    .write\
    .format('iceberg')\
    .mode('overwrite')\
    .saveAsTable(f'{catalog}.{database}.orders_single_partition_sorted')

                                                                                

In [48]:
query = f'''
    SELECT 
        'unsorted' as desc, round(sum(file_size_in_bytes/power(1024,2)),2) as file_size_in_MB 
    FROM {catalog}.{database}.orders_unprocessed.files

    union all 

    SELECT 
        'sorted' , round(sum(file_size_in_bytes/power(1024,2)),2) 
    FROM {catalog}.{database}.orders_single_partition_sorted.files;
'''
spark.sql(query).show()

+--------+---------------+
|    desc|file_size_in_MB|
+--------+---------------+
|unsorted|          34.97|
|  sorted|          36.72|
+--------+---------------+



In [49]:
query = f'''
    CALL glue.system.rewrite_data_files(
    table => '{database}.orders_single_partition_sorted', 
    strategy => 'sort', 
    options => map('min-input-files','1'),
    sort_order => 'order_status ASC,  order_priority ASC, order_date ASC' 
    )
'''
spark.sql(query).show()

                                                                                

+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|                         3|                     1|             38501568|                      0|
+--------------------------+----------------------+---------------------+-----------------------+



In [50]:
query = f'''
    SELECT 
        'unsorted' as desc, round(sum(file_size_in_bytes/power(1024,2)),2) as file_size_in_MB 
    FROM {catalog}.{database}.orders_unprocessed.files

    union all 

    SELECT 
        'sorted' , round(sum(file_size_in_bytes/power(1024,2)),2) 
    FROM {catalog}.{database}.orders_single_partition_sorted.files;
'''
spark.sql(query).show()

+--------+---------------+
|    desc|file_size_in_MB|
+--------+---------------+
|unsorted|          34.97|
|  sorted|          36.63|
+--------+---------------+

