In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col

# Create a SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

# Read the events CSV file and create a new column 'event_date'
events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))

# Read the devices CSV file
devices = spark.read.option("header", "true").csv("/home/iceberg/data/devices.csv")

# Join the events and devices DataFrames
joined_df = events.join(devices, on="device_id", how="left")

# Rename columns using select and cast 'event_time' correctly
df = joined_df.select(
    "url",
    "referrer",
    col("browser_type").alias("browser_family"),
    col("os_type").alias("os_family"),
    col("device_type").alias("device_family"),
    "host",
    "event_time",
    "event_date"
)

# Show the result
df.show()

24/12/03 00:30:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+--------+--------------+---------+-------------+--------------------+--------------------+-------------------+
|       url|referrer|browser_family|os_family|device_family|                host|          event_time|         event_date|
+----------+--------+--------------+---------+-------------+--------------------+--------------------+-------------------+
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-03-08 17:27:...|2021-03-08 00:00:00|
|         /|    NULL|         Other|    Other|        Other|    www.eczachly.com|2021-05-10 11:26:...|2021-05-10 00:00:00|
|         /|    NULL|         Other|    Other|        Other|admin.zachwilson....|2021-02-17 16:19:...|2021-02-17 00:00:00|
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-09-24 15:53:...|2021-09-24 00:00:00|
|         /|    NULL|         Other|    Other|        Other| www.zachwilson.tech|2021-09-26 16:03:...|2021-09-26 00:00:00|
|         /|    

In [7]:
sorted = df.repartition(10, col("event_date")) \
        .sortWithinPartitions(col("event_date"), col("host"), col("browser_family")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

sortedTwo = df.repartition(10, col("event_date")) \
        .sort(col("event_date"), col("host"), col("browser_family")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

sortedTwo.explain()
sorted.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [url#218, referrer#216, browser_family#271, os_family#272, device_family#273, host#217, cast(event_time#219 as timestamp) AS event_time#333, event_date#226]
   +- Sort [event_date#226 ASC NULLS FIRST, host#217 ASC NULLS FIRST, browser_family#271 ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(event_date#226 ASC NULLS FIRST, host#217 ASC NULLS FIRST, browser_family#271 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=264]
         +- Exchange hashpartitioning(event_date#226, 10), REPARTITION_BY_NUM, [plan_id=262]
            +- Project [url#218, referrer#216, browser_type#253 AS browser_family#271, os_type#254 AS os_family#272, device_type#255 AS device_family#273, host#217, event_time#219, event_date#226]
               +- BroadcastHashJoin [device_id#215], [device_id#252], LeftOuter, BuildRight, false
                  :- Project [device_id#215, referrer#216, host#217, url#218, event_time#219, date_tr

In [8]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [9]:
%%sql

DROP TABLE IF EXISTS bootcamp.events

In [12]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);


In [19]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [17]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted (
     url STRING,
     referrer STRING,
     browser_family STRING,
     os_family STRING,
     device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (event_date);

In [20]:

start_df = df.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) \
    

first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("browser_family"), col("host"))

# sorted = df.repartition(10, col("event_date")) \
#         .sortWithinPartitions(col("event_date")) \
#         .withColumn("event_time", col("event_time").cast("timestamp")) \

start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted")

                                                                                

In [21]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted.files





size,num_files,sorted
3301248,4,sorted
3589113,4,unsorted


In [90]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM demo.bootcamp.events.files;

size,num_files
3145713,5


In [None]:
%%sql 
SELECT COUNT(1) FROM bootcamp.matches_bucketed.files

count(1)
3665
