In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col

# Create a SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

# Read the events CSV file and create a new column 'event_date'
events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))

# Read the devices CSV file
devices = spark.read.option("header", "true").csv("/home/iceberg/data/devices.csv")

# Join the events and devices DataFrames
joined_df = events.join(devices, on="device_id", how="left")

# Rename columns using select and cast 'event_time' correctly
df = joined_df.select(
    "url",
    "referrer",
    col("browser_type").alias("browser_family"),
    col("os_type").alias("os_family"),
    col("device_type").alias("device_family"),
    "host",
    "event_time",
    "event_date"
)

# Show the result
df.show()

In [None]:
sorted = df.repartition(10, col("event_date")) \
        .sortWithinPartitions(col("event_date"), col("host"), col("browser_family")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

sorted.show()

In [None]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [None]:
%%sql

DROP TABLE IF EXISTS bootcamp.eventsdevices

In [None]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.eventsdevices (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (years(event_date));


In [None]:
# DataFrame with data to be inserted
sorted.write.format("iceberg").mode("append").saveAsTable("bootcamp.eventsdevices")

In [None]:
%%sql
select * from bootcamp.eventsdevices limit 5;

In [None]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg;

In [None]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg;

In [None]:

start_df = df.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) \
    

first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("browser_family"), col("host"))

sorted = df.repartition(10, col("event_date")) \
        .sortWithinPartitions(col("event_date")) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \

start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted")

In [None]:
%%sql
SELECT *
FROM demo.bootcamp.events_sorted.files

In [None]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted.files


In [None]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM demo.bootcamp.eventsdevices.files;

In [None]:
%%sql 
SELECT COUNT(1) as num_files FROM bootcamp.matches_bucketed.files