In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession



product_file="/Volumes/workspace/ramakrishna/rk/products.csv"
df_products=spark.read.csv(product_file,header=True,inferSchema=True)

order_file="/Volumes/workspace/ramakrishna/rk/orders.csv"
df_orders=spark.read.csv(order_file,header=True,inferSchema=True)



In [0]:
df_orders.show(5, False)
df_orders.printSchema()

+--------+----------+-----------+--------+----------+------------+
|order_id|product_id|customer_id|quantity|order_date|total_amount|
+--------+----------+-----------+--------+----------+------------+
|1       |80        |10         |4       |2023-03-20|1003        |
|2       |69        |30         |3       |2023-12-11|780         |
|3       |61        |20         |4       |2023-04-26|1218        |
|4       |62        |44         |3       |2023-08-26|2022        |
|5       |78        |46         |4       |2023-08-05|1291        |
+--------+----------+-----------+--------+----------+------------+
only showing top 5 rows
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- total_amount: integer (nullable = true)



In [0]:
df_products.show(5, False)
df_products.printSchema()  

+----------+------------+-----------+-------+-----+-----+
|product_id|product_name|category   |brand  |price|stock|
+----------+------------+-----------+-------+-----+-----+
|1         |Product_1   |Electronics|Brand_4|26   |505  |
|2         |Product_2   |Apparel    |Brand_4|489  |15   |
|3         |Product_3   |Apparel    |Brand_4|102  |370  |
|4         |Product_4   |Groceries  |Brand_1|47   |433  |
|5         |Product_5   |Groceries  |Brand_3|244  |902  |
+----------+------------+-----------+-------+-----+-----+
only showing top 5 rows
root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- stock: integer (nullable = true)



In [0]:
df_products.select("product_id").distinct().count()

100

In [0]:
df_orders.select("order_id").distinct().count()

1000

Bucketing In Joins

In [0]:
df_orders_product_details = (
        df_orders.join(
              df_products.hint("shuffle_hash"),
               on="product_id",
               how="inner")
)

In [0]:
df_orders_product_details.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonProject [product_id#13557, order_id#13556, customer_id#13558, quantity#13559, order_date#13560, total_amount#13561, product_name#13533, category#13534, brand#13535, price#13536, stock#13537]
         +- PhotonShuffledHashJoin [product_id#13557], [product_id#13532], Inner, BuildRight
            :- PhotonShuffleExchangeSource
            :  +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#10356]
            :     +- PhotonShuffleExchangeSink hashpartitioning(product_id#13557, 1024)
            :        +- PhotonFilter isnotnull(product_id#13557)
            :           +- PhotonRowToColumnar
            :              +- FileScan csv [order_id#13556,product_id#13557,customer_id#13558,quantity#13559,order_date#13560,total_amount#13561] Batched: false, DataFilters: [isnotnull(product_id#13557)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/Volu

In [0]:
df_orders_product_details.count()

1000

In [0]:
(
    df_products
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("products_bucketed")
)

spark.sql(
    """
    OPTIMIZE products_bucketed
    ZORDER BY (product_id)
    """
)

(
    df_orders
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("orders_bucketed")
)

spark.sql(
    """
    OPTIMIZE orders_bucketed
    ZORDER BY (product_id)
    """
)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
df_orders_bucketed = spark.table("orders_bucketed")
df_products_bucketed = spark.table("products_bucketed")

In [0]:
df_orders_product_details_bucketed = (
    df_orders_bucketed.join(
        df_products_bucketed,
        on="product_id",
        how="inner"
    )
)

In [0]:
df_orders_product_details_bucketed.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonProject [product_id#14246, order_id#14245, customer_id#14247, quantity#14248, order_date#14249, total_amount#14250, product_name#14252, category#14253, brand#14254, price#14255, stock#14256]
         +- PhotonBroadcastHashJoin [product_id#14246], [product_id#14251], Inner, BuildRight, false, true
            :- PhotonScan parquet workspace.default.orders_bucketed[order_id#14245,product_id#14246,customer_id#14247,quantity#14248,order_date#14249,total_amount#14250] DataFilters: [isnotnull(product_id#14246)], DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[s3://dbstorage-prod-r2zga/uc/72dd60e6-62db-4693-93d4-8d4a5108cae4..., OptionalDataFilters: [hashedrelationcontains(product_id#14246)], PartitionFilters: [], ReadSchema: struct<order_id:int,product_id:int,customer_id:int,quantity:int,order_date:date,total_amount:int>, Requ

In [0]:
df_orders_product_details_bucketed.count()

1000

Bucketing In Aggregations

In [0]:
df_orders.show(5, False)

+--------+----------+-----------+--------+----------+------------+
|order_id|product_id|customer_id|quantity|order_date|total_amount|
+--------+----------+-----------+--------+----------+------------+
|1       |80        |10         |4       |2023-03-20|1003        |
|2       |69        |30         |3       |2023-12-11|780         |
|3       |61        |20         |4       |2023-04-26|1218        |
|4       |62        |44         |3       |2023-08-26|2022        |
|5       |78        |46         |4       |2023-08-05|1291        |
+--------+----------+-----------+--------+----------+------------+
only showing top 5 rows


In [0]:
# WITHOUT BUCKETING

df_product_sales = (
    df_orders
    .groupBy("product_id")
    .agg(F.sum("total_amount").alias("sales"))
)

df_product_sales.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonGroupingAgg(keys=[product_id#13557], functions=[finalmerge_sum(merge sum#14363L) AS sum(total_amount)#14361L])
         +- PhotonShuffleExchangeSource
            +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#11361]
               +- PhotonShuffleExchangeSink hashpartitioning(product_id#13557, 1024)
                  +- PhotonGroupingAgg(keys=[product_id#13557], functions=[partial_sum(total_amount#13561) AS sum#14363L])
                     +- PhotonRowToColumnar
                        +- FileScan csv [product_id#13557,total_amount#13561] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/Volumes/workspace/ramakrishna/rk/orders.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:int,total_amount:int>


== Photon Explanation ==
The query is fully supported by Photon.


In [0]:
# WITH BUCKETING

df_product_sales = (
    df_orders_bucketed
    .groupBy("product_id")
    .agg(F.sum("total_amount").alias("sales"))
)

df_product_sales.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonGroupingAgg(keys=[product_id#14378], functions=[sum(total_amount#14382)])
         +- PhotonScan parquet workspace.default.orders_bucketed[product_id#14378,total_amount#14382] DataFilters: [], DictionaryFilters: [], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[s3://dbstorage-prod-r2zga/uc/72dd60e6-62db-4693-93d4-8d4a5108cae4..., OptionalDataFilters: [], PartitionFilters: [], ReadSchema: struct<product_id:int,total_amount:int>, RequiredDataFilters: []


== Photon Explanation ==
The query is fully supported by Photon.
== Optimizer Statistics (table names per statistics state) ==
  missing = 
  partial = 
  full    = orders_bucketed



Bucket Pruning


In [0]:
df_product_sales_bucket_pruning = (
    df_orders_bucketed
    .filter(F.col("product_id") == 1)
    .groupBy("product_id")
    .agg(F.sum("total_amount").alias("sales"))
)

df_product_sales_bucket_pruning.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonGroupingAgg(keys=[product_id#14413], functions=[sum(total_amount#14417)])
         +- PhotonScan parquet workspace.default.orders_bucketed[product_id#14413,total_amount#14417] DataFilters: [isnotnull(product_id#14413), (product_id#14413 = 1)], DictionaryFilters: [(product_id#14413 = 1)], Format: parquet, Location: PreparedDeltaFileIndex(1 paths)[s3://dbstorage-prod-r2zga/uc/72dd60e6-62db-4693-93d4-8d4a5108cae4..., OptionalDataFilters: [], PartitionFilters: [], ReadSchema: struct<product_id:int,total_amount:int>, RequiredDataFilters: [isnotnull(product_id#14413), (product_id#14413 = 1)]


== Photon Explanation ==
The query is fully supported by Photon.
== Optimizer Statistics (table names per statistics state) ==
  missing = 
  partial = 
  full    = orders_bucketed

