<a href="https://colab.research.google.com/github/keertu24/databricks_1/blob/main/75_Bucketing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ColabSpark") \
    .getOrCreate()


##Check If Buckting Enabled

In [2]:
spark.conf.get("spark.sql.sources.bucketing.enabled")

'true'

##Create Sample Data for Demo

In [19]:
from pyspark.sql.functions import col,rand
df=spark.range(1,10000000,1,10).select(col("id").alias("PK"),rand(10).alias("Attribute"))
df.show()

+---+-------------------+
| PK|          Attribute|
+---+-------------------+
|  1| 0.1709497137955568|
|  2| 0.8051143958005459|
|  3| 0.5775925576589018|
|  4| 0.9476047869880925|
|  5|    0.2093704977577|
|  6|0.36664222617947817|
|  7| 0.8078688178371882|
|  8| 0.7135143433452461|
|  9| 0.7195325566306053|
| 10|0.31335292311175456|
| 11| 0.8062503712025726|
| 12|0.10814914646176654|
| 13| 0.3362232980701172|
| 14| 0.8133304803837667|
| 15|0.47649428738170896|
| 16|  0.524728096293865|
| 17| 0.9701253460019921|
| 18| 0.6232167713919952|
| 19| 0.5089687568245219|
| 20| 0.5467504094508642|
+---+-------------------+
only showing top 20 rows


##Create Non-Buckted Table

In [21]:
df.write.format("parquet").mode("overwrite").saveAsTable("nonbucketedTable")

##Create Buckted Table

In [22]:
df.write.format("parquet").bucketBy(10,"PK").mode("overwrite").saveAsTable("bucktedTable")

In [23]:
df1=spark.table("bucktedTable")
df2=spark.table("bucktedTable")

df3=spark.table("nonbucketedTable")
df4=spark.table("nonbucketedTable")

##Broadcast Join by Deafult if less than 10 mB

In [24]:
df3.join(df4,"PK","inner").explain()

== Physical Plan ==
*(5) Project [PK#43L, Attribute#44, Attribute#46]
+- *(5) SortMergeJoin [PK#43L], [PK#45L], Inner
   :- *(2) Sort [PK#43L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(PK#43L, 200), ENSURE_REQUIREMENTS, [plan_id=566]
   :     +- *(1) Filter isnotnull(PK#43L)
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet spark_catalog.default.nonbucketedtable[PK#43L,Attribute#44] Batched: true, DataFilters: [isnotnull(PK#43L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/spark-warehouse/nonbucketedtable], PartitionFilters: [], PushedFilters: [IsNotNull(PK)], ReadSchema: struct<PK:bigint,Attribute:double>
   +- *(4) Sort [PK#45L ASC NULLS FIRST], false, 0
      +- ReusedExchange [PK#45L, Attribute#46], Exchange hashpartitioning(PK#43L, 200), ENSURE_REQUIREMENTS, [plan_id=566]




##Disable Broadcast Join

In [25]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
spark.conf.set("spark.sql.adaptive.enabled",False)

In [26]:
df3.join(df4,"PK","inner").explain()

== Physical Plan ==
*(5) Project [PK#43L, Attribute#44, Attribute#50]
+- *(5) SortMergeJoin [PK#43L], [PK#49L], Inner
   :- *(2) Sort [PK#43L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(PK#43L, 200), ENSURE_REQUIREMENTS, [plan_id=653]
   :     +- *(1) Filter isnotnull(PK#43L)
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet spark_catalog.default.nonbucketedtable[PK#43L,Attribute#44] Batched: true, DataFilters: [isnotnull(PK#43L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/spark-warehouse/nonbucketedtable], PartitionFilters: [], PushedFilters: [IsNotNull(PK)], ReadSchema: struct<PK:bigint,Attribute:double>
   +- *(4) Sort [PK#49L ASC NULLS FIRST], false, 0
      +- ReusedExchange [PK#49L, Attribute#50], Exchange hashpartitioning(PK#43L, 200), ENSURE_REQUIREMENTS, [plan_id=653]




In [36]:
df3.join(df4,"PK","inner").count()

9999999

##Non bucketed to bucketed join. One side would be shuffled

In [27]:
df3.join(df2,"PK","inner").explain()

== Physical Plan ==
*(4) Project [PK#43L, Attribute#44, Attribute#42]
+- *(4) SortMergeJoin [PK#43L], [PK#41L], Inner
   :- *(2) Sort [PK#43L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(PK#43L, 10), ENSURE_REQUIREMENTS, [plan_id=739]
   :     +- *(1) Filter isnotnull(PK#43L)
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet spark_catalog.default.nonbucketedtable[PK#43L,Attribute#44] Batched: true, DataFilters: [isnotnull(PK#43L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/spark-warehouse/nonbucketedtable], PartitionFilters: [], PushedFilters: [IsNotNull(PK)], ReadSchema: struct<PK:bigint,Attribute:double>
   +- *(3) Sort [PK#41L ASC NULLS FIRST], false, 0
      +- *(3) Filter isnotnull(PK#41L)
         +- *(3) ColumnarToRow
            +- FileScan parquet spark_catalog.default.bucktedtable[PK#41L,Attribute#42] Batched: true, Bucketed: true, DataFilters: [isnotnull(PK#41L)], Format: Parquet, Location: InMemoryFileIndex(1 pat

In [35]:
df3.join(df2,"PK","inner").count()

9999999

##Both Bucketed

In [28]:
df1.join(df2,"PK","inner").explain()

== Physical Plan ==
*(3) Project [PK#41L, Attribute#42, Attribute#54]
+- *(3) SortMergeJoin [PK#41L], [PK#53L], Inner
   :- *(1) Sort [PK#41L ASC NULLS FIRST], false, 0
   :  +- *(1) Filter isnotnull(PK#41L)
   :     +- *(1) ColumnarToRow
   :        +- FileScan parquet spark_catalog.default.bucktedtable[PK#41L,Attribute#42] Batched: true, Bucketed: true, DataFilters: [isnotnull(PK#41L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/spark-warehouse/bucktedtable], PartitionFilters: [], PushedFilters: [IsNotNull(PK)], ReadSchema: struct<PK:bigint,Attribute:double>, SelectedBucketsCount: 10 out of 10
   +- *(2) Sort [PK#53L ASC NULLS FIRST], false, 0
      +- *(2) Filter isnotnull(PK#53L)
         +- *(2) ColumnarToRow
            +- FileScan parquet spark_catalog.default.bucktedtable[PK#53L,Attribute#54] Batched: true, Bucketed: true, DataFilters: [isnotnull(PK#53L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/spark-warehouse/bucktedtable],

In [33]:
df1.join(df2,"PK","inner").count()

9999999