In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

spark

events = spark.read.option("header", "true").csv("/home/iceberg/data/events.csv").withColumn("event_date", expr("DATE_TRUNC('day', event_time)"))
devices = spark.read.option("header","true").csv("/home/iceberg/data/devices.csv")

df = events.join(devices,on="device_id",how="left")
df = df.withColumnsRenamed({'browser_type': 'browser_family', 'os_type': 'os_family'})

df.show(20,False)

+----------+-----------+--------+---------------------+----------+--------------------------+-------------------+--------------+---------+-----------+
|device_id |user_id    |referrer|host                 |url       |event_time                |event_date         |browser_family|os_family|device_type|
+----------+-----------+--------+---------------------+----------+--------------------------+-------------------+--------------+---------+-----------+
|532630305 |1037710827 |NULL    |www.zachwilson.tech  |/         |2021-03-08 17:27:24.241000|2021-03-08 00:00:00|Other         |Other    |Other      |
|532630305 |925588856  |NULL    |www.eczachly.com     |/         |2021-05-10 11:26:21.247000|2021-05-10 00:00:00|Other         |Other    |Other      |
|532630305 |-1180485268|NULL    |admin.zachwilson.tech|/         |2021-02-17 16:19:30.738000|2021-02-17 00:00:00|Other         |Other    |Other      |
|532630305 |-1044833855|NULL    |www.zachwilson.tech  |/         |2021-09-24 15:53:14.466000|2

In [5]:
sorted = df.repartition(10, col("event_date"))\
    .sortWithinPartitions(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sortedTwo = df.repartition(10, col("event_date"))\
    .sort(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sorted.explain()#.show()
sortedTwo.explain()#.show()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [device_id#292, user_id#291, referrer#293, host#294, url#295, cast(event_time#296 as timestamp) AS event_time#535, event_date#303, browser_family#348, os_family#349, device_type#332]
   +- Sort [event_date#303 ASC NULLS FIRST, host#294 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(event_date#303, 10), REPARTITION_BY_NUM, [plan_id=640]
         +- Project [device_id#292, user_id#291, referrer#293, host#294, url#295, event_time#296, event_date#303, browser_type#330 AS browser_family#348, os_type#331 AS os_family#349, device_type#332]
            +- BroadcastHashJoin [device_id#292], [device_id#329], LeftOuter, BuildRight, false
               :- Project [user_id#291, device_id#292, referrer#293, host#294, url#295, event_time#296, date_trunc(day, cast(event_time#296 as timestamp), Some(Etc/UTC)) AS event_date#303]
               :  +- FileScan csv [user_id#291,device_id#292,referrer#293,host#294,url#295,eve

In [None]:
# .sortWithinPartitions() sorts within partitions, whereas .sort() is a global sort, which is very slow

# Note - exchange is synonymous with Shuffle

In [6]:
sorted = df.repartition(10, col("event_date"))\
    .sortWithinPartitions(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sortedTwo = df.repartition(10, col("event_date"))\
    .sort(col("event_date"), col("host"))\
    .withColumn("event_time", col("event_time").cast("timestamp")) 

sorted.explain()
sortedTwo.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#17, device_id#18, referrer#19, host#20, url#21, cast(event_time#22 as timestamp) AS event_time#288, event_date#29]
   +- Sort [event_date#29 ASC NULLS FIRST, host#20 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(event_date#29, 10), REPARTITION_BY_NUM, [plan_id=294]
         +- Project [user_id#17, device_id#18, referrer#19, host#20, url#21, event_time#22, date_trunc(day, cast(event_time#22 as timestamp), Some(Etc/UTC)) AS event_date#29]
            +- FileScan csv [user_id#17,device_id#18,referrer#19,host#20,url#21,event_time#22] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:string,device_id:string,referrer:string,host:string,url:string,event_time:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [user_id#17, device_id#18, referr

In [7]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [8]:
%%sql

DROP TABLE IF EXISTS bootcamp.events

In [9]:
%%sql

DROP TABLE IF EXISTS bootcamp.events_sorted

In [10]:
%%sql

CREATE TABLE IF NOT EXISTS bootcamp.events (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (years(event_date));


In [11]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_sorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (years(event_date));

In [12]:
%%sql


CREATE TABLE IF NOT EXISTS bootcamp.events_unsorted (
    url STRING,
    referrer STRING,
    browser_family STRING,
    os_family STRING,
    device_family STRING,
    host STRING,
    event_time TIMESTAMP,
    event_date DATE
)
USING iceberg
PARTITIONED BY (year(event_date));

In [13]:

start_df = df.repartition(4, col("event_date")).withColumn("event_time", col("event_time").cast("timestamp")) \
    
first_sort_df = start_df.sortWithinPartitions(col("event_date"), col("host"))

start_df.write.mode("overwrite").saveAsTable("bootcamp.events_unsorted")
first_sort_df.write.mode("overwrite").saveAsTable("bootcamp.events_sorted")

                                                                                

In [14]:
%%sql

SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'sorted' 
FROM demo.bootcamp.events_sorted.files

UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'unsorted' 
FROM demo.bootcamp.events_unsorted.files





size,num_files,sorted
5441299,4,sorted
5553010,4,unsorted


In [15]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files FROM demo.bootcamp.events.files;

size,num_files
,0


In [16]:
%%sql 
SELECT COUNT(1) FROM bootcamp.matches_bucketed.files

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `bootcamp`.`matches_bucketed`.`files` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 21;
'Aggregate [unresolvedalias(count(1), None)]
+- 'UnresolvedRelation [bootcamp, matches_bucketed, files], [], false


In [20]:
%%sql
select * from demo.bootcamp.events_sorted.files

                                                                                

content,file_path,file_format,spec_id,partition,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id,readable_metrics
0,s3://warehouse/bootcamp/events_sorted/data/00000-71-bb6b0eaa-2ea0-43ef-919a-09182b9e4425-0-00001.parquet,PARQUET,1,Row(event_date_year=None),89391,1103888,"{1: 111517, 2: 68766, 3: 48243, 4: 25723, 6: 2674, 7: 390792, 8: 2274, 9: 103859, 10: 315033, 11: 30583}","{1: 89391, 2: 89391, 3: 89391, 4: 89391, 6: 89391, 7: 89391, 8: 89391, 9: 89391, 10: 89391, 11: 89391}","{1: 0, 2: 46359, 3: 0, 4: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 1, 11: 0}",{},"{1: bytearray(b'/'), 2: bytearray(b'52.20.78.240'), 3: bytearray(b'%E3%82%A6%E3%82%'), 4: bytearray(b'Android'), 6: bytearray(b'aashish.techcrea'), 7: bytearray(b' \xba\xe7\xb8\xa8\xb8\x05\x00'), 8: bytearray(b'\x00\xa0&\xb4\xa8\xb8\x05\x00'), 9: bytearray(b'-100210680'), 10: bytearray(b'-1000095488'), 11: bytearray(b'17MB150WB')}","{1: bytearray(b'/zzageqnf.php?Fp'), 2: bytearray(b'zachwilson.tech'), 3: bytearray(b'webprosbot'), 4: bytearray(b'iOS'), 6: bytearray(b'zachwilson.techd'), 7: bytearray(b'\xe8\xb0\x1b\x8ec\x03\x06\x00'), 8: bytearray(b'\x00\xe0dqO\x03\x06\x00'), 9: bytearray(b'999535123'), 10: bytearray(b'999884938'), 11: bytearray(b'vivo $2')}",,[4],,0,"Row(browser_family=Row(column_size=48243, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='%E3%82%A6%E3%82%', upper_bound='webprosbot'), device_id=Row(column_size=103859, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='-100210680', upper_bound='999535123'), device_type=Row(column_size=30583, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='17MB150WB', upper_bound='vivo $2'), event_date=Row(column_size=2274, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 12, 0, 0), upper_bound=datetime.datetime(2023, 8, 20, 0, 0)), event_time=Row(column_size=390792, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 12, 0, 1, 19, 764000), upper_bound=datetime.datetime(2023, 8, 20, 23, 59, 41, 89000)), host=Row(column_size=2674, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='aashish.techcrea', upper_bound='zachwilson.techd'), os_family=Row(column_size=25723, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=68766, value_count=89391, null_value_count=46359, nan_value_count=None, lower_bound='52.20.78.240', upper_bound='zachwilson.tech'), url=Row(column_size=111517, value_count=89391, null_value_count=0, nan_value_count=None, lower_bound='/', upper_bound='/zzageqnf.php?Fp'), user_id=Row(column_size=315033, value_count=89391, null_value_count=1, nan_value_count=None, lower_bound='-1000095488', upper_bound='999884938'))"
0,s3://warehouse/bootcamp/events_sorted/data/00001-72-bb6b0eaa-2ea0-43ef-919a-09182b9e4425-0-00001.parquet,PARQUET,1,Row(event_date_year=None),99232,1245154,"{1: 145848, 2: 73756, 3: 48033, 4: 34566, 6: 3294, 7: 435861, 8: 2355, 9: 117090, 10: 344927, 11: 34955}","{1: 99232, 2: 99232, 3: 99232, 4: 99232, 6: 99232, 7: 99232, 8: 99232, 9: 99232, 10: 99232, 11: 99232}","{1: 0, 2: 49299, 3: 0, 4: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 58, 11: 0}",{},"{1: bytearray(b'""/?""""<?=print(93'), 2: bytearray(b'""https://www.goo'), 3: bytearray(b') Bot'), 4: bytearray(b'Android'), 6: bytearray(b'abhishekanand.te'), 7: bytearray(b'(\x83\xb2EX\xb8\x05\x00'), 8: bytearray(b'\x00 \xc9<X\xb8\x05\x00'), 9: bytearray(b'-100210680'), 10: bytearray(b'-1000370060'), 11: bytearray(b'13 Pro Max')}","{1: bytearray(b'/zz.php'), 2: bytearray(b'zachwilson.tech'), 3: bytearray(b'webprosbot'), 4: bytearray(b'iOS'), 6: bytearray(b'zsavi524.techcrf'), 7: bytearray(b'\x88\xb8\x07P;\x03\x06\x00'), 8: bytearray(b""\x00 \xb65\'\x03\x06\x00""), 9: bytearray(b'999535123'), 10: bytearray(b'999956796'), 11: bytearray(b'vivo $2')}",,[4],,0,"Row(browser_family=Row(column_size=48033, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_id=Row(column_size=117090, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='-100210680', upper_bound='999535123'), device_type=Row(column_size=34955, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='13 Pro Max', upper_bound='vivo $2'), event_date=Row(column_size=2355, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 8, 0, 0), upper_bound=datetime.datetime(2023, 8, 18, 0, 0)), event_time=Row(column_size=435861, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 8, 0, 2, 29, 513000), upper_bound=datetime.datetime(2023, 8, 18, 23, 59, 0, 901000)), host=Row(column_size=3294, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='abhishekanand.te', upper_bound='zsavi524.techcrf'), os_family=Row(column_size=34566, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=73756, value_count=99232, null_value_count=49299, nan_value_count=None, lower_bound='""https://www.goo', upper_bound='zachwilson.tech'), url=Row(column_size=145848, value_count=99232, null_value_count=0, nan_value_count=None, lower_bound='""/?""""<?=print(93', upper_bound='/zz.php'), user_id=Row(column_size=344927, value_count=99232, null_value_count=58, nan_value_count=None, lower_bound='-1000370060', upper_bound='999956796'))"
0,s3://warehouse/bootcamp/events_sorted/data/00002-73-bb6b0eaa-2ea0-43ef-919a-09182b9e4425-0-00001.parquet,PARQUET,1,Row(event_date_year=None),93956,1430918,"{1: 351330, 2: 94518, 3: 45585, 4: 27674, 6: 3112, 7: 408578, 8: 2019, 9: 115393, 10: 343502, 11: 34791}","{1: 93956, 2: 93956, 3: 93956, 4: 93956, 6: 93956, 7: 93956, 8: 93956, 9: 93956, 10: 93956, 11: 93956}","{1: 0, 2: 48227, 3: 1, 4: 1, 6: 0, 7: 0, 8: 0, 9: 0, 10: 0, 11: 1}",{},"{1: bytearray(b'""/?""""<?=print(93'), 2: bytearray(b'""https://www.goo'), 3: bytearray(b') Bot'), 4: bytearray(b'Android'), 6: bytearray(b'ablumhardt.techc'), 7: bytearray(b'\x18\xe8_\xb2\xf3\xb7\x05\x00'), 8: bytearray(b'\x00@\x94\xa7\xf3\xb7\x05\x00'), 9: bytearray(b'-1000866068'), 10: bytearray(b'-1000675882'), 11: bytearray(b'ALP-AL00')}","{1: bytearray(b'/zz/address.php@'), 2: bytearray(b'zachwilson.tech'), 3: bytearray(b'webprosbot'), 4: bytearray(b'webOS'), 6: bytearray(b'zzz.techcreator/'), 7: bytearray(b'HE\xdbM\xb3\x03\x06\x00'), 8: bytearray(b'\x00`\xc2\xe8\x9f\x03\x06\x00'), 9: bytearray(b'998961543'), 10: bytearray(b'999956796'), 11: bytearray(b'vivo $2')}",,[4],,0,"Row(browser_family=Row(column_size=45585, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_id=Row(column_size=115393, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='-1000866068', upper_bound='998961543'), device_type=Row(column_size=34791, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound='ALP-AL00', upper_bound='vivo $2'), event_date=Row(column_size=2019, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 3, 0, 0), upper_bound=datetime.datetime(2023, 8, 24, 0, 0)), event_time=Row(column_size=408578, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 3, 0, 3, 1, 119000), upper_bound=datetime.datetime(2023, 8, 24, 23, 8, 20, 509000)), host=Row(column_size=3112, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='ablumhardt.techc', upper_bound='zzz.techcreator/'), os_family=Row(column_size=27674, value_count=93956, null_value_count=1, nan_value_count=None, lower_bound='Android', upper_bound='webOS'), referrer=Row(column_size=94518, value_count=93956, null_value_count=48227, nan_value_count=None, lower_bound='""https://www.goo', upper_bound='zachwilson.tech'), url=Row(column_size=351330, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='""/?""""<?=print(93', upper_bound='/zz/address.php@'), user_id=Row(column_size=343502, value_count=93956, null_value_count=0, nan_value_count=None, lower_bound='-1000675882', upper_bound='999956796'))"
0,s3://warehouse/bootcamp/events_sorted/data/00003-74-bb6b0eaa-2ea0-43ef-919a-09182b9e4425-0-00001.parquet,PARQUET,1,Row(event_date_year=None),122235,1661339,"{1: 290056, 2: 98606, 3: 63591, 4: 41728, 6: 3551, 7: 512117, 8: 2154, 9: 148134, 10: 450920, 11: 45563}","{1: 122235, 2: 122235, 3: 122235, 4: 122235, 6: 122235, 7: 122235, 8: 122235, 9: 122235, 10: 122235, 11: 122235}","{1: 0, 2: 53009, 3: 0, 4: 0, 6: 0, 7: 0, 8: 0, 9: 0, 10: 8, 11: 0}",{},"{1: bytearray(b'/'), 2: bytearray(b'3.220.57.224'), 3: bytearray(b') Bot'), 4: bytearray(b'Android'), 6: bytearray(b'accc.techcreator'), 7: bytearray(b'@n.\xbd\xdf\xb7\x05\x00'), 8: bytearray(b'\x00\xe0\xbc\x89\xdf\xb7\x05\x00'), 9: bytearray(b'-1001669954'), 10: bytearray(b'-1000015881'), 11: bytearray(b'$2')}","{1: bytearray(b'/zz.php'), 2: bytearray(b'zachwilson.tech'), 3: bytearray(b'webprosbot'), 4: bytearray(b'iOS'), 6: bytearray(b'zachwilson.techd'), 7: bytearray(b'\xd8\xaf\x9a\xe8\x9f\x03\x06\x00'), 8: bytearray(b'\x00\x00\xeb\xca\x8b\x03\x06\x00'), 9: bytearray(b'998766634'), 10: bytearray(b'999882344'), 11: bytearray(b'vivo $2')}",,[4],,0,"Row(browser_family=Row(column_size=63591, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=') Bot', upper_bound='webprosbot'), device_id=Row(column_size=148134, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='-1001669954', upper_bound='998766634'), device_type=Row(column_size=45563, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='$2', upper_bound='vivo $2'), event_date=Row(column_size=2154, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 2, 0, 0), upper_bound=datetime.datetime(2023, 8, 23, 0, 0)), event_time=Row(column_size=512117, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound=datetime.datetime(2021, 1, 2, 0, 14, 23, 80000), upper_bound=datetime.datetime(2023, 8, 23, 23, 59, 57, 399000)), host=Row(column_size=3551, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='accc.techcreator', upper_bound='zachwilson.techd'), os_family=Row(column_size=41728, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='Android', upper_bound='iOS'), referrer=Row(column_size=98606, value_count=122235, null_value_count=53009, nan_value_count=None, lower_bound='3.220.57.224', upper_bound='zachwilson.tech'), url=Row(column_size=290056, value_count=122235, null_value_count=0, nan_value_count=None, lower_bound='/', upper_bound='/zz.php'), user_id=Row(column_size=450920, value_count=122235, null_value_count=8, nan_value_count=None, lower_bound='-1000015881', upper_bound='999882344'))"
