In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, lit

spark = SparkSession.builder.appName("zack_data_eng").getOrCreate()
events = spark.read.option("header", "true") \
        .csv("/home/iceberg/data/events.csv") \
        .withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))

devices = spark.read.option("header","true").csv("/home/iceberg/data/devices.csv")

df = events.join(devices,on="device_id",how="left")
df = df.withColumnsRenamed({'browser_type': 'browser_family', 'os_type': 'os_family'})

df.show()

+----------+-----------+--------+--------------------+----------+--------------------+-------------------+--------------+---------+-----------+
| device_id|    user_id|referrer|                host|       url|          event_time|         event_date|browser_family|os_family|device_type|
+----------+-----------+--------+--------------------+----------+--------------------+-------------------+--------------+---------+-----------+
| 532630305| 1037710827|    NULL| www.zachwilson.tech|         /|2021-03-08 17:27:...|2021-03-08 00:00:00|         Other|    Other|      Other|
| 532630305|  925588856|    NULL|    www.eczachly.com|         /|2021-05-10 11:26:...|2021-05-10 00:00:00|         Other|    Other|      Other|
| 532630305|-1180485268|    NULL|admin.zachwilson....|         /|2021-02-17 16:19:...|2021-02-17 00:00:00|         Other|    Other|      Other|
| 532630305|-1044833855|    NULL| www.zachwilson.tech|         /|2021-09-24 15:53:...|2021-09-24 00:00:00|         Other|    Other|     

In [22]:
#sorted = df.repartition(10, col("event_date")) \
sorted = df.sortWithinPartitions(col("event_date"), col("host"), col("browser_family")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

#sorted_2 = df.repartition(10, col("event_date")) \
sorted_2 = df.sort(col("event_date"), col("host"), col("browser_family")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

# sorted.explain()#show()
# sorted_2.explain()#show()

In [23]:
%%sql
    
CREATE DATABASE IF NOT EXISTS bootcamp

25/08/26 01:25:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [24]:
%%sql
    
DROP TABLE IF EXISTS bootcamp.events

In [None]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events(
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (years(event_date));