### Data Skipping and Z-Ordering
 With the release of Data Skipping in Delta Lake 1.2.0, column-level statistics like min/max are now available. Statistics are saved in the Delta Lake transaction log (DeltaLog) every time an `add` action is performed corresponding to adding a new Parquet file.
 By leveraging min-max ranges, Delta Lake is able to skip the Parquet files that are out of the range of the querying field values (Data Skipping). In order to make it effective, data can be clustered by Z-Order columns so that min-max ranges are narrow and, ideally, non-overlapping.

### Delta Table Structure

 You are wroking at a cyber security company. Your team collects traffic data which is created by an open source
network traffic analyzer. The schema is straightforward:

In [1]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip


builder = SparkSession.builder.appName('CreateDeltaTables') \
    .config(
        'spark.jars.packages',
        'io.delta:delta-core_2.12:2.2.0') \
    .config(
        'spark.sql.extensions',
        'io.delta.sql.DeltaSparkSessionExtension') \
    .config(
        'spark.sql.catalog.spark_catalog',
        'org.apache.spark.sql.delta.catalog.DeltaCatalog')

spark = configure_spark_with_delta_pip(builder).getOrCreate()

security = spark.read \
    .format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load('../../data/security.csv')
security.show(n=5, truncate=False)

23/04/29 01:28:06 WARN Utils: Your hostname, aix.local resolves to a loopback address: 127.0.0.1; using 192.168.2.100 instead (on interface en0)
23/04/29 01:28:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/shin/.local/share/virtualenvs/sharing-examples-hEeTnWv9/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/shin/.ivy2/cache
The jars for the packages stored in: /Users/shin/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-06867846-b7ce-4324-9e0a-3e22223b7e12;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 253ms :: artifacts dl 15ms
	:: modules in use:
	io.delta#delta-core_2.12;2.2.0 from central in [default]
	io.delta#delta-storage;2.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0 

23/04/29 01:28:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


+---------------+--------+--------------+--------+
|src_ip         |src_port|dst_ip        |dst_port|
+---------------+--------+--------------+--------+
|99.209.152.133 |55025   |85.150.136.248|61511   |
|159.155.100.124|26766   |62.103.1.70   |55074   |
|235.89.179.185 |3232    |0.212.11.209  |65246   |
|33.182.243.225 |21905   |54.115.186.133|22622   |
|54.36.25.7     |23061   |44.97.51.98   |46932   |
+---------------+--------+--------------+--------+
only showing top 5 rows



The structure of the table is as follows:

In [2]:
%%bash

tree -a ../../data/security-table

[01;34m../../data/security-table[0m
├── [00m.part-00000-0dd07884-23fd-4258-9dbe-4dc1fbbd4db0-c000.snappy.parquet.crc[0m
├── [00m.part-00000-3c05d64a-5704-4a88-957c-bf7fcf9ffc99-c000.snappy.parquet.crc[0m
├── [00m.part-00000-52260cd2-35e6-4696-bca5-c9dcd189d360-c000.snappy.parquet.crc[0m
├── [00m.part-00000-6523dea4-6fcb-432b-af00-59a82e6f5c6c-c000.snappy.parquet.crc[0m
├── [00m.part-00000-678af661-e81d-42c1-b065-d034c529c113-c000.snappy.parquet.crc[0m
├── [00m.part-00000-75a525fb-e06d-4ff4-8dfb-95be7ceee9e1-c000.snappy.parquet.crc[0m
├── [00m.part-00000-7c3ad93f-81b2-4e9b-8aa2-e2ad7c5a10b4-c000.snappy.parquet.crc[0m
├── [00m.part-00000-7df7863c-c3a3-484e-bd50-7d3d380b1e43-c000.snappy.parquet.crc[0m
├── [00m.part-00000-faf12ec5-a21c-4cf6-9867-614b6c9a814d-c000.snappy.parquet.crc[0m
├── [00m.part-00000-fe28cdca-f5d8-4e75-8189-5aeeece80b84-c000.snappy.parquet.crc[0m
├── [01;34m_delta_log[0m
│   ├── [00m.00000000000000000000.json.crc[0m
│   ├── [00m.0000000000000

### Filter Clauses

 Suppose we are only interested in the traffic which satisfies the following conditions:

 - 128.0.0.0 <= `src_ip` <= 191.255.255.255
 - 1024 <= `src_port` <= 65535
 - 128.0.0.0 <= `dst_ip` <= 191.255.255.255 and 1024 <= `dst_port` <= 65535

### Partition by Range (Explicit Sorting)
 
 As our data is randomly generated and so there are no correlations. So let’s try explicitly sorting data before writing it.

In [4]:
df = spark.read \
    .format('delta') \
    .load('../../data/security-table')
#df.sort('src_ip', 'src_port', 'dst_ip', 'dst_port') \
#    .repartition(10) \
df.repartitionByRange(10, 'src_ip', 'src_port', 'dst_ip') \
    .write \
    .mode('overwrite') \
    .format('delta') \
    .save('../../data/security-table-part-by-range')

23/04/29 01:28:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

The structure of the table is as follows:

In [5]:
%%bash

tree -a ../../data/security-table-part-by-range

[01;34m../../data/security-table-part-by-range[0m
├── [00m.part-00000-3cd43b0a-8956-42f9-9123-00163940975d-c000.snappy.parquet.crc[0m
├── [00m.part-00000-9edff352-1332-43ac-8c90-110d2386fbe3-c000.snappy.parquet.crc[0m
├── [00m.part-00001-06a98e77-be71-4f87-b7dd-b7632751d433-c000.snappy.parquet.crc[0m
├── [00m.part-00001-1f022d03-8826-4c34-a556-0add96827a04-c000.snappy.parquet.crc[0m
├── [00m.part-00002-15d9b2e3-c7a9-4f95-8c35-9b2eb073fc1d-c000.snappy.parquet.crc[0m
├── [00m.part-00002-29a58daa-6f11-4c5c-8411-166fb4915c7e-c000.snappy.parquet.crc[0m
├── [00m.part-00003-4dc0414e-ea7d-4095-9d44-a4ee56384a55-c000.snappy.parquet.crc[0m
├── [00m.part-00003-f9f40697-734a-4781-b1a2-ae69a3a55664-c000.snappy.parquet.crc[0m
├── [00m.part-00004-00aff318-cf29-4f80-b34c-bc519268f8d4-c000.snappy.parquet.crc[0m
├── [00m.part-00004-f20d075d-f39a-475d-96df-343e88b625c8-c000.snappy.parquet.crc[0m
├── [00m.part-00005-0267e0b3-5ed8-4bcc-a513-98b9ffa00003-c000.snappy.parquet.crc[0m
├─

### Skipping Effectiveness

 Now, let's inspect the skipping effectiveness. Your end goal is likely to minimize the total amount of time spent on running these queries and the egress cost, but, for illustration purposes, let’s instead define our cost function as the total number of records scanned:

In [6]:
from deltalake import DeltaTable
import pyarrow.dataset as ds


dt = DeltaTable('../../data/security-table-part-by-range')
dt.get_add_actions().to_pandas()[['min', 'max']]

Unnamed: 0,min,max
0,"{'src_ip': '0.112.77.223', 'src_port': 23, 'ds...","{'src_ip': '123.197.48.105', 'src_port': 65520..."
1,"{'src_ip': '123.203.100.246', 'src_port': 78, ...","{'src_ip': '143.77.124.105', 'src_port': 65376..."
2,"{'src_ip': '143.85.134.171', 'src_port': 74, '...","{'src_ip': '167.95.74.29', 'src_port': 65530, ..."
3,"{'src_ip': '168.1.239.225', 'src_port': 9, 'ds...","{'src_ip': '191.219.83.12', 'src_port': 65487,..."
4,"{'src_ip': '191.224.71.75', 'src_port': 69, 'd...","{'src_ip': '213.68.64.41', 'src_port': 65514, ..."
5,"{'src_ip': '213.71.241.101', 'src_port': 31, '...","{'src_ip': '238.255.165.57', 'src_port': 65518..."
6,"{'src_ip': '238.31.73.179', 'src_port': 1, 'ds...","{'src_ip': '3.57.196.112', 'src_port': 65392, ..."
7,"{'src_ip': '3.6.149.206', 'src_port': 53, 'dst...","{'src_ip': '51.49.25.156', 'src_port': 65517, ..."
8,"{'src_ip': '51.51.81.24', 'src_port': 64, 'dst...","{'src_ip': '77.102.222.156', 'src_port': 65470..."
9,"{'src_ip': '77.102.66.140', 'src_port': 48, 'd...","{'src_ip': '99.99.188.7', 'src_port': 65525, '..."
