In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

spark

events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))
devices = spark.read.option("header","true").csv("/home/iceberg/data/devices.csv")

df = events.join(devices,on="device_id",how="left")
df = df.withColumnsRenamed({'browser_type': 'browser_family', 'os_type': 'os_family'})

df.collect()

                                                                                

[Row(device_id='532630305', user_id='1037710827', referrer=None, host='www.zachwilson.tech', url='/', event_time='2021-03-08 17:27:24.241000', event_date=datetime.datetime(2021, 3, 8, 0, 0), browser_family='Other', os_family='Other', device_type='Other'),
 Row(device_id='532630305', user_id='925588856', referrer=None, host='www.eczachly.com', url='/', event_time='2021-05-10 11:26:21.247000', event_date=datetime.datetime(2021, 5, 10, 0, 0), browser_family='Other', os_family='Other', device_type='Other'),
 Row(device_id='532630305', user_id='-1180485268', referrer=None, host='admin.zachwilson.tech', url='/', event_time='2021-02-17 16:19:30.738000', event_date=datetime.datetime(2021, 2, 17, 0, 0), browser_family='Other', os_family='Other', device_type='Other'),
 Row(device_id='532630305', user_id='-1044833855', referrer=None, host='www.zachwilson.tech', url='/', event_time='2021-09-24 15:53:14.466000', event_date=datetime.datetime(2021, 9, 24, 0, 0), browser_family='Other', os_family='Oth

In [5]:
sorted = df.repartition(10, col("event_date"))\
    .sortWithinPartitions(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sortedTwo = df.repartition(10, col("event_date"))\
    .sort(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sorted.show()
sortedTwo.show()


+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+
|    user_id|  device_id|            referrer|                host|                 url|          event_time|         event_date|
+-----------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+
| 1129583063|  532630305|                NULL|admin.zachwilson....|                   /|2021-01-07 09:21:...|2021-01-07 00:00:00|
| -648945006| 1088283544|                NULL|    www.eczachly.com|                   /|2021-01-07 02:58:...|2021-01-07 00:00:00|
|-1871780024| -158310583|                NULL|    www.eczachly.com|                   /|2021-01-07 04:17:...|2021-01-07 00:00:00|
|  203689086| 1088283544|                NULL|    www.eczachly.com|/blog/what-exactl...|2021-01-07 10:03:...|2021-01-07 00:00:00|
|-1180485268|  532630305|                NULL|    www.eczachly.com|                   /|20

In [None]:
# .sortWithinPartitions() sorts within partitions, whereas .sort() is a global sort, which is very slow

# Note - exchange is synonymous with Shuffle

In [6]:
sorted = df.repartition(10, col("event_date"))\
    .sortWithinPartitions(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sortedTwo = df.repartition(10, col("event_date"))\
    .sort(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sorted.explain()
sortedTwo.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#17, device_id#18, referrer#19, host#20, url#21, cast(event_time#22 as timestamp) AS event_time#288, event_date#29]
   +- Sort [event_date#29 ASC NULLS FIRST, host#20 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(event_date#29, 10), REPARTITION_BY_NUM, [plan_id=294]
         +- Project [user_id#17, device_id#18, referrer#19, host#20, url#21, event_time#22, date_trunc(day, cast(event_time#22 as timestamp), Some(Etc/UTC)) AS event_date#29]
            +- FileScan csv [user_id#17,device_id#18,referrer#19,host#20,url#21,event_time#22] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:string,device_id:string,referrer:string,host:string,url:string,event_time:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#17, device_id#18, referr

In [7]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [20]:
%%sql

DROP TABLE IF EXISTS bootcamp.events

In [21]:
%%sql

DROP TABLE IF EXISTS bootcamp.events_sorted

In [22]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (years(event_date));


In [23]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (years(event_date));

In [24]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (year(event_date));

In [26]:

start_df = df.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) \
    
first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("host"))

start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted")

                                                                                

In [121]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted.files





size,num_files,sorted
2896920,4,sorted
3211534,4,unsorted


In [90]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM demo.bootcamp.events.files;

size,num_files
3145713,5


In [None]:
%%sql 
SELECT COUNT(1) FROM bootcamp.matches_bucketed.files

count(1)
3665
