In [None]:
#here we are looking at how to do bucket join in spark

In [None]:
// In python use: from pyspark.sql.functions import broadcast, split, lit
import org.apache.spark.sql.functions.{broadcast, split, lit}

In [None]:
# Imports useful functions:
# broadcast: For broadcast joins (not used yet, but handy!)
# split: To split strings into arrays.
# lit: To create literal columns.

In [2]:
val matchesBucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/matches.csv")
val matchDetailsBucketed =  spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/match_details.csv")

matchesBucketed: org.apache.spark.sql.DataFrame = [match_id: string, mapid: string ... 8 more fields]
matchDetailsBucketed: org.apache.spark.sql.DataFrame = [match_id: string, player_gamertag: string ... 34 more fields]


In [None]:
# Reads CSV files and creates Spark DataFrames called matchesBucketed and matchDetailsBucketed.
# .option("header", "true") means Spark will use the first line as column names.
# .option("inferSchema", "true") means Spark will try to guess the correct data types for each column.

In [3]:
matchesBucketed.show(5)        // Show 5 rows from matchesBucketed
matchDetailsBucketed.show(5)   // Show 5 rows from matchDetailsBucketed

+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+
|            match_id|               mapid|is_team_game|         playlist_id|     game_variant_id|is_match_over|    completion_date|match_duration|game_mode|      map_variant_id|
+--------------------+--------------------+------------+--------------------+--------------------+-------------+-------------------+--------------+---------+--------------------+
|11de1a94-8d07-416...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|         true|2016-02-22 00:00:00|          NULL|     NULL|                NULL|
|d3643e71-3e51-43e...|cb914b9e-f206-11e...|       false|d0766624-dbd7-453...|257a305e-4dd3-41f...|         true|2016-02-14 00:00:00|          NULL|     NULL|                NULL|
|d78d2aae-36e4-48a...|c7edbf0f-f206-11e...|        true|f72e0ef0-7c4a-430...|1e473914-46e4-408...|       

In [4]:
matchesBucketed.printSchema()        // See columns and data types
matchDetailsBucketed.printSchema()

root
 |-- match_id: string (nullable = true)
 |-- mapid: string (nullable = true)
 |-- is_team_game: boolean (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- game_variant_id: string (nullable = true)
 |-- is_match_over: boolean (nullable = true)
 |-- completion_date: timestamp (nullable = true)
 |-- match_duration: string (nullable = true)
 |-- game_mode: string (nullable = true)
 |-- map_variant_id: string (nullable = true)

root
 |-- match_id: string (nullable = true)
 |-- player_gamertag: string (nullable = true)
 |-- previous_spartan_rank: integer (nullable = true)
 |-- spartan_rank: integer (nullable = true)
 |-- previous_total_xp: integer (nullable = true)
 |-- total_xp: integer (nullable = true)
 |-- previous_csr_tier: integer (nullable = true)
 |-- previous_csr_designation: integer (nullable = true)
 |-- previous_csr: integer (nullable = true)
 |-- previous_csr_percent_to_next_tier: integer (nullable = true)
 |-- previous_csr_rank: integer (nullable = true)
 |-

In [5]:
matchesBucketed.count()              // Total number of rows
matchDetailsBucketed.count()

res3: Long = 151761


In [None]:
these 2 files have shared key match_id that they can join on, we are looking on match_id to be the join key
for this iceberg table

In [6]:
spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")
val bucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
     match_id STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (completion_date, bucket(16, match_id));
 """
 spark.sql(bucketedDDL

#  define new table and This means it uses Apache Iceberg, a modern table format for big data.
# PARTITIONED BY: date and buckets (this will give us ability to work with this data)
# First by completion_date (for time-based partitioning)
# Then by bucket(16, match_id) (bucketing)
# (Divides data into 16 buckets based on match_id hash value)

bucketedDDL: String =
"
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
     match_id STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (completion_date, bucket(16, match_id));
 "
res4: org.apache.spark.sql.DataFrame = []


In [8]:
//If you want to double-check that it’s gone, you can list tables in the database:
spark.sql("SHOW TABLES IN bootcamp").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
| bootcamp|              events|      false|
| bootcamp|events_aggregated...|      false|
| bootcamp|       events_sorted|      false|
| bootcamp|    events_sorted_v2|      false|
| bootcamp|     events_unsorted|      false|
| bootcamp|  events_unsorted_v2|      false|
| bootcamp|    matches_bucketed|      false|
+---------+--------------------+-----------+



In [None]:
Why bucket?
When you join with another table bucketed on the same column and same number of buckets, Spark can optimize the join to avoid shuffling!

In [9]:
// See the table structure (columns, partitioning)
spark.sql("DESCRIBE FORMATTED bootcamp.matches_bucketed").show(100, false)

+----------------------------+-------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                              |comment|
+----------------------------+-------------------------------------------------------------------------------------------------------+-------+
|match_id                    |string                                                                                                 |NULL   |
|is_team_game                |boolean                                                                                                |NULL   |
|playlist_id                 |string                                                                                                 |NULL   |
|completion_date             |timestamp                                                                                              |NULL   |

In [None]:
  matchesBucketed.select(
     $"match_id", $"is_team_game", $"playlist_id", $"completion_date"
     )
    .write.mode("append")
    .partitionBy("completion_date")
  .bucketBy(16, "match_id").saveAsTable("bootcamp.matches_bucketed")

In [None]:
#this is how we are going to writeout the data into the buckets
# matchesBucketed.select(...)
# Selects only the columns you want to write into the new table.

# .write
# Tells Spark you want to write the DataFrame somewhere.

# .mode("append")
# Adds the data to the table if it already exists (doesn’t overwrite).

# .partitionBy("completion_date")
# Partitions the table by completion_date — files will be organized by date.

# .bucketBy(16, "match_id")
# Buckets the data into 16 files based on a hash of match_id.
# (This matches your table definition. Bucketing improves join performance.)

# .saveAsTable("bootcamp.matches_bucketed")
# Actually writes the data to the Iceberg table you created.

In [None]:
spark.table("bootcamp.matches_bucketed").count()

In [None]:
spark.table("bootcamp.matches_bucketed").show(5)

In [None]:
spark.sql("SHOW PARTITIONS bootcamp.matches_bucketed").show(100, false)

In [None]:
spark.sql("""
  SELECT completion_date, COUNT(*) 
  FROM bootcamp.matches_bucketed 
  GROUP BY completion_date
""").show()

In [None]:

 val bucketedDetailsDDL = """
 CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
     match_id STRING,
     player_gamertag STRING,
     player_total_kills INTEGER,
     player_total_deaths INTEGER
 )
 USING iceberg
 PARTITIONED BY (bucket(16, match_id));
 """
spark.sql(bucketedDetailsDDL)

In [None]:
#  creating another bucketed table for your match details! 
#  Instead of traditional partitions, you’re just bucketing this table by match_id into 16 buckets.

# No date-based partitioning here, just bucketing.

In [None]:
spark.sql("SHOW TABLES IN bootcamp").show()

spark.sql("DESCRIBE FORMATTED bootcamp.match_details_bucketed").show(100, false)

In [None]:
# 2. See if buckets are recognized (partitions column will mention buckets)
# In the output of DESCRIBE FORMATTED, look under # Partitioning — you should see info about the bucketing.

In [None]:
# Why is this important?
# Now both tables (matches_bucketed and match_details_bucketed) are bucketed by match_id with the same number of buckets.

# This sets you up for a bucketed join, which is faster because Spark can join files directly without shuffling data.

In [None]:
 matchDetailsBucketed.select(
     $"match_id", $"player_gamertag", $"player_total_kills", $"player_total_deaths")
     .write.mode("append")
   .bucketBy(16, "match_id").saveAsTable("bootcamp.match_details_bucketed")

In [None]:
we are essentially moving data from csv files to bucketed iceberg table, if we bucket things we dont have to shuffle 

In [1]:

// In python use: from pyspark.sql.functions import broadcast, split, lit
import org.apache.spark.sql.functions.{broadcast, split, lit}


val matchesBucketed = spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/matches.csv")
val matchDetailsBucketed =  spark.read.option("header", "true")
                        .option("inferSchema", "true")
                        .csv("/home/iceberg/data/match_details.csv")


spark.sql("""DROP TABLE IF EXISTS bootcamp.matches_bucketed""")
val bucketedDDL = """
CREATE TABLE IF NOT EXISTS bootcamp.matches_bucketed (
     match_id STRING,
     is_team_game BOOLEAN,
     playlist_id STRING,
     completion_date TIMESTAMP
 )
 USING iceberg
 PARTITIONED BY (completion_date, bucket(16, match_id));
 """
 spark.sql(bucketedDDL)

// matchesBucketed.select(
//     $"match_id", $"is_team_game", $"playlist_id", $"completion_date"
//     )
//     .write.mode("append")
//     .partitionBy("completion_date")
//   .bucketBy(16, "match_id").saveAsTable("bootcamp.matches_bucketed")


// val bucketedDetailsDDL = """
// CREATE TABLE IF NOT EXISTS bootcamp.match_details_bucketed (
//     match_id STRING,
//     player_gamertag STRING,
//     player_total_kills INTEGER,
//     player_total_deaths INTEGER
// )
// USING iceberg
// PARTITIONED BY (bucket(16, match_id));
// """
// spark.sql(bucketedDetailsDDL)

// matchDetailsBucketed.select(
//     $"match_id", $"player_gamertag", $"player_total_kills", $"player_total_deaths")
//     .write.mode("append")
//   .bucketBy(16, "match_id").saveAsTable("bootcamp.match_details_bucketed")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

matchesBucketed.createOrReplaceTempView("matches")
matchDetailsBucketed.createOrReplaceTempView("match_details")

spark.sql("""
    SELECT * FROM bootcamp.match_details_bucketed mdb JOIN bootcamp.matches_bucketed md 
    ON mdb.match_id = md.match_id
    AND md.completion_date = DATE('2016-01-01')
        
""").explain()


spark.sql("""
    SELECT * FROM match_details mdb JOIN matches md ON mdb.match_id = md.match_id
        
""").explain()

// spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1000000000000")

// val broadcastFromThreshold = matches.as("m").join(matchDetails.as("md"), $"m.match_id" === $"md.match_id")
//   .select($"m.completion_date", $"md.player_gamertag",  $"md.player_total_kills")
//   .take(5)

// val explicitBroadcast = matches.as("m").join(broadcast(matchDetails).as("md"), $"m.match_id" === $"md.match_id")
//   .select($"md.*", split($"completion_date", " ").getItem(0).as("ds"))

// val bucketedValues = matchDetailsBucketed.as("mdb").join(matchesBucketed.as("mb"), $"mb.match_id" === $"mdb.match_id").explain()
// // .take(5)

// val values = matchDetailsBucketed.as("m").join(matchesBucketed.as("md"), $"m.match_id" === $"md.match_id").explain()

// explicitBroadcast.write.mode("overwrite").insertInto("match_details_bucketed")

// matches.withColumn("ds", split($"completion_date", " ").getItem(0)).write.mode("overwrite").insertInto("matches_bucketed")

// spark.sql(bucketedSQL)



Intitializing Scala interpreter ...

Spark Web UI available at http://b79bc3db1652:4041
SparkContext available as 'sc' (version = 3.5.5, master = local[*], app id = local-1753646333178)
SparkSession available as 'spark'


org.apache.spark.sql.catalyst.ExtendedAnalysisException:  [TABLE_OR_VIEW_NOT_FOUND] The table or view `bootcamp`.`match_details_bucketed` cannot be found. Verify the spelling and correctness of the schema and catalog.