# 1. Prepare data

Test the writing and reading performance on GeoLake.


We are going to use a Portaxi dataset which has 2m records. You can find it here: https://star.cs.ucr.edu/?portotaxi#center=41.1636,-8.5872&zoom=13

In [None]:
%%shell

wget https://star.cs.ucr.edu/datasets/portotaxi/download.geojson.gz -O - | gzip -d > /home/iceberg/data/portotaxi.geojson

In [None]:
%%shell

ls -lh /home/iceberg/data/portotaxi.geojson

In [None]:
%%init_spark

# The file is large(3.5G), run this cell if you have enough memory

launcher.num_executors = 1
launcher.executor_cores = 8
launcher.driver_memory = '16g'
launcher.executor_memory = '16g'
launcher.conf.set("spark.driver.maxResultSize","4g")

In [None]:
%%shell

# if no enough memory, you can split the file and only read the first 10k records
pip install geojsplit && cd /home/iceberg/data/ && geojsplit -n 1 --geometry-count 10000 portotaxi.geojson && ls -lh /home/iceberg/data/

In [None]:
import org.wololo.geojson.GeoJSONFactory
import org.wololo.jts2geojson.GeoJSONReader
import org.wololo.geojson.Feature
import org.apache.spark.sql.DataFrame
import com.fasterxml.jackson.databind.ObjectMapper

import spark.sqlContext.implicits._


def readGeojson(filePath: String): DataFrame = {
    val colnames = Seq("TRIP_ID", "CALL_TYPE", "ORIGIN_STAND", "TAXI_ID", "TIMESTAMP", "DAY_TYPE", "MISSING_DATA",  "geometry")
    val geoJsonString = spark.read.textFile(filePath).collect().mkString
    val mapper = new ObjectMapper()
    val it = mapper.readTree(geoJsonString).get("features").iterator()
    var features = Seq[Feature]()
    while (it.hasNext()) {
        val nextFea = it.next()
        try {
            features = features :+ GeoJSONFactory.create(nextFea.toString).asInstanceOf[Feature]
        } catch {
             case e: Exception => null
        }
    }
    val rows = features.map(feature => {
        val reader = new GeoJSONReader
        (
    		feature.getProperties.get("TRIP_ID").asInstanceOf[Long]
          	,feature.getProperties.get("CALL_TYPE").asInstanceOf[String]
    		,feature.getProperties.get("ORIGIN_STAND").asInstanceOf[String]
    		,feature.getProperties.get("TAXI_ID").asInstanceOf[Integer]
    		,feature.getProperties.get("TIMESTAMP").asInstanceOf[String]
    		,feature.getProperties.get("DAY_TYPE").asInstanceOf[String]
    		,feature.getProperties.get("MISSING_DATA").asInstanceOf[Boolean]
    		,reader.read(feature.getGeometry)
    	)
    })
    sc.parallelize(rows).toDF(colnames: _*)
}


In [None]:
val fullFile = "/home/iceberg/data/portotaxi_xaaaa.geojson" // change to "portotaxi.geojson" if you have enough memory
val df = readGeojson(fullFile).repartition(10)
df.cache
df.createOrReplaceTempView("portotaxi")

In [None]:
spark.sql("SELECT * FROM portotaxi").count()

# 2. Benchmark of Parquet Encodings

## Create Tables

Create table with different geo-encodings.

In [None]:
val geoEncodings = Seq("nested-list", "wkb-bbox", "wkb")
val tables = Seq("portotaxi_nested_list", "portotaxi_wkb_bbox", "portotaxi_wkb")

tables.zip(geoEncodings).foreach(x => {
    val sql = s"""CREATE TABLE IF NOT EXISTS demo.db.${x._1}
    (
      TRIP_ID LONG,
      CALL_TYPE STRING,
      ORIGIN_STAND STRING,
      TAXI_ID INTEGER,
      TIMESTAMP STRING,
      DAY_TYPE STRING,
      MISSING_DATA BOOLEAN,
      geometry GEOMETRY
    )
    USING iceberg
    TBLPROPERTIES ('write.parquet.geometry.encoding' = '${x._2}')
    """
    println(sql)
    spark.sql(sql)
})

## Writing



In [None]:
tables.foreach(tb => {
    val t0 = System.currentTimeMillis()
    spark.sql(s"INSERT INTO demo.db.${tb} SELECT * FROM portotaxi")
    val t1 = System.currentTimeMillis()
    println(s"time cost on table ${tb}: ${(t1 - t0) / 1000.0}s")
})

In [None]:
val sql = tables.map(t => s"""
(SELECT '${t}' as table, 
    summary['total-records'] as total_records,
    round(summary['total-files-size'] / 1024 / 1024, 2) as file_size_in_mb
 FROM demo.db.${t}.snapshots)
""").reduce(_ + " UNION " + _)
spark.sql(sql).show()

## Reading

In [None]:
val bbox = "POLYGON ((-8.6079 41.1489, -8.6089 41.1472, -8.6066 41.1470, -8.6061 41.1483, -8.6079 41.1489))"
tables.foreach(t => {
    val t0 = System.currentTimeMillis()
    spark.sql(s"SELECT count(*) FROM demo.db.${t} WHERE ST_Within(geometry, IcebergSTGeomFromText('${bbox}'))").show()
    val t1 = System.currentTimeMillis()
    println(s"time cost on table ${t}: ${(t1 - t0) / 1000.0}s")
})

# 3. Benchmark of Partitions


Create tables with different partition resolution: 3, 7, 11, 15, 19.

In [None]:
val resolutions = Seq(3, 7, 11, 15, 19)
resolutions.foreach(r => {
    val sql = s"""
    CREATE TABLE IF NOT EXISTS demo.db.portotaxi_xz${r}
    (
      TRIP_ID LONG,
      CALL_TYPE STRING,
      ORIGIN_STAND STRING,
      TAXI_ID INTEGER,
      TIMESTAMP STRING,
      DAY_TYPE STRING,
      MISSING_DATA BOOLEAN,
      geometry GEOMETRY
    )
    USING iceberg
    PARTITIONED BY (xz2(geometry, ${r}))
    TBLPROPERTIES ('write.parquet.geometry.encoding' = 'nested-list')    
    """
    spark.sql(sql)
    val t0 = System.currentTimeMillis()
    spark.sql(s"INSERT INTO demo.db.portotaxi_xz${r} SELECT * FROM portotaxi")
    val t1 = System.currentTimeMillis()
    println(s"time cost on table demo.db.portotaxi_xz${r}: ${(t1 - t0) / 1000.0}s")
})


Number of partitions and data files in each table:

In [None]:
val sql = resolutions.map(t => s"(SELECT ${t} as resolution, summary['changed-partition-count'] as partitions,summary['total-data-files'] as total_data_files FROM demo.db.portotaxi_xz${t}.snapshots)").reduce(_ + " UNION " + _) + " ORDER BY resolution"

spark.sql(sql).show()

Reading speed:

In [None]:
resolutions.foreach(t => {
    val t0 = System.currentTimeMillis()
    spark.sql(s"SELECT count(*) FROM demo.db.portotaxi_xz${t} WHERE ST_Within(geometry, ST_GeomFromText('${bbox}'))").show()
    val t1 = System.currentTimeMillis()
    println(s"time cost on resolutions ${t}: ${(t1 - t0) / 1000.0}s")
})