# Xskipper -  Extensible Data Skipping Framework for Apache Spark

In this notebook, we demonstrate using Xskipper Scala API.

Data skipping can significantly boost the performance of SQL queries by skipping over irrelevant data objects or files based on a summary metadata associated with each object.  
For every column in the object, the summary metadata might include minimum and maximum values, a list or bloom filter of the appearing values, or other metadata which succinctly represents the data in that column. This metadata is used during query evaluation to skip over objects which have no relevant data.

To use this feature, you need to create indexes on one or more columns of the data set. After this is done, Spark SQL queries can benefit from data skipping. In general, you should index the columns which are queried most often in the WHERE clause.

Note that all Spark native data formats are supported, including Parquet, ORC, CSV, JSON and Avro. Data skipping is a performance optimization feature which means that using data skipping does not affect the content of the query results.

## Table of Contents

- [Setup](#setup)
- [Indexing a Dataset](#indexing_dataset)
- [Index Usage](#using)
- [Index Life Cycle](#index_life_cycle)
- [Working with Hive Tables](#hive_tables)

## Setup <a id='setup'></a>

In this example, we will set the JVM wide parameter to a base path to store all of the indexes.  
The metadata can be stored on the same storage system as the data however, not under the same path.

In the following examples we will use [IBM Cloud Object Storage](https://www.ibm.com/il-en/cloud/object-storage) to store both the data and the metadata.  
(The cells below assume the credentials were already set, for more information about setting credentials when using IBM Cloud Object Storage see [Stocator Storage Connector](https://github.com/CODAIT/stocator).

During query time the metdata for the dataset will be looked up in this location

For more configuration options, see [Data skipping configuration options](https://xskipper.io/api/configuration/configuration/).

In [1]:
import io.xskipper._
import io.xskipper.implicits._

// The base location to store all metadata
// TODO: change to your metadata location
val md_base_location = s"cos://${bucket}.service/metadata"

// Configuring the JVM wide parameters 
// in addition configure the identifier class for IBM Cloud Object Storage
val conf = Map(
  "io.xskipper.parquet.mdlocation" -> md_base_location,
  "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION",
  "io.xskipper.identifierclass" -> "io.xskipper.utils.identifier.IBMCOSIdentifier")
Xskipper.setConf(conf)

Compile Error: <console>:25: error: object xskipper is not a member of package io
       import io.xskipper._
                 ^
<console>:29: error: not found: value bucket
       val md_base_location = s"cos://${bucket}.service/metadata"
                                        ^
<console>:34: error: not found: value md_base_path
         "io.xskipper.parquet.mdlocation" -> md_base_path,
                                             ^
<console>:39: error: not found: value Xskipper
       Xskipper.setConf(conf)
       ^


## Indexing a dataset <a id='indexing_dataset'></a>

###  Creating a sample Dataset <a id='sample_dataset'></a>

First, let's create a sample dataset that will be used throught this sample.

In [4]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

// TODO: change to your data location
val dataset_location = s"cos://${bucket}.service/data"

val schema = List(
  StructField("dt", StringType, true),
  StructField("temp", DoubleType, true),
  StructField("city", StringType, true),
  StructField("vid", StringType, true)
)

val data = Seq(
  Row("2017-07-07", 20.0, "Tel-Aviv", "a"),
  Row("2017-07-08", 30.0, "Jerusalem", "b")
)

val ds = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(schema)
)

// use partitionBy to make sure we have two objects
ds.write.partitionBy("dt").mode("overwrite").parquet(dataset_location)

// read the dataset back from storage
val reader = spark.read.format("parquet")
val df = reader.load(dataset_location)
df.show(false)

+----+---------+---+----------+
|temp|city     |vid|dt        |
+----+---------+---+----------+
|30.0|Jerusalem|b  |2017-07-08|
|20.0|Tel-Aviv |a  |2017-07-07|
+----+---------+---+----------+



### Indexing <a id='indexing'></a>

When creating a data skipping index on a data set, first decide which columns to index, then choose an index type for each column. These choices are workload and data dependent. Typically, choose columns to which predicates are applied in many queries.

The following index types are supported out of the box:

1. Min/max – stores the minimum and maximum values for a column. Applies to all types except complex types.
2. Value list – stores the list of values appearing in a column. Applies to all types except complex types.
3. Bloom Filter – stores bloom filter. Applies to ByteType, StringType, LongType, IntegerType, and ShortType.

- Choose value list if the number of distinct values in an object is typically much smaller than the total number of values in that object
- Bloom filters are recommended for columns with high cardinality. (otherwise the index can get as big as that column in the data set).

Xskipper also enables to create your own data skipping indexes and specify how to use them during query time. For more details see [here](https://xskipper-io.github.io/xskipper/api/creating-new-plugin/)

In [5]:
// create Xskipper instance for the sample dataset
val xskipper = new Xskipper(spark, dataset_location)

// remove existing index if needed
if (xskipper.isIndexed()) {
  xskipper.dropIndex()
}

xskipper.indexBuilder()
        .addMinMaxIndex("temp")
        .addValueListIndex("city")
        .addBloomFilterIndex("vid")
        .build(reader)
        .show(false)

+-------+-----------------+-------------------+
|status |new_entries_added|old_entries_removed|
+-------+-----------------+-------------------+
|SUCCESS|2                |0                  |
+-------+-----------------+-------------------+



### View the created index status

The following code shows how a user can view current index status to check which indexes exist on the dataset and whether the index is up-to-date

In [6]:
// Describe the index
xskipper.describeIndex(reader).show(false)

+-------------------------+---------------------------------------------------------+--------------------+
|Data Skipping Index Stats|cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp/data|             Comment|
+-------------------------+---------------------------------------------------------+--------------------+
|                   Status|                                               Up to date|                    |
|     Total objects ind...|                                                        2|                    |
|     # Metadata proper...|                                                         |                    |
|        Metadata location|                                     cos://guyx27snote...|                    |
|      # Index information|                                                         |                    |
|             # Index type|                                                  Columns|              Params|
|                   minmax|          

### List Indexed datasets

The following code shows how a user can view all indexed dataset under the current base location

In [7]:
Xskipper.listIndexes(spark).show(false)

+---------------------------------------------------------+---------------------------------------------------------------------+-------------+
|Dataset                                                  |Index type                                                           |Index columns|
+---------------------------------------------------------+---------------------------------------------------------------------+-------------+
|# Metadatastore Manager parameters                       |                                                                     |             |
|Metadata base path                                       |cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp.service/metadata|             |
|cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp/data|minmax                                                               |temp         |
|                                                         |valuelist                                                            |city   

## Using the Data Skipping Indexes<a id='using'></a>

### Enable/Disable Xskipper<a id='enable_disable'></a>

Xskipper provides APIs to enable or disable index usage with Spark.

By using "enable" command, Xskipper optimization rules become visible to the Apache Spark optimizer and will be used in query optimization and execution.\
By using "disable' command, Xskipper optimization rules no longer apply during query optimization. Note that disabling Xskipper has no impact on created indexes as they remain intact.

In [8]:
// Enable Xskipper
spark = spark.enableXskipper()

// Disable Xskipper
spark.disableXskipper()

// You can use the following to check whether the Xskipper is enabled
if (!spark.isXskipperEnabled()) {
    spark = spark.enableXskipper()
}

## Running Queries<a id='run_queries_dataset'></a>

Once Xskipper has been enabled you can continue running queries (using either SQL or DataFrame API) regularly and enjoy data skipping.

Create a temporary view on the dataset

In [9]:
df.createOrReplaceTempView("sample")

### Example query using Min/max index

In [10]:
spark.sql("select * from sample where temp < 30").show()

+----+--------+---+----------+
|temp|    city|vid|        dt|
+----+--------+---+----------+
|20.0|Tel-Aviv|  a|2017-07-07|
+----+--------+---+----------+



##### Inspecting query skipping stats
You can inspect the data skipping statistics for the latest query using the following API:

In [11]:
Xskipper.getLatestQueryAggregatedStats(spark).show(false)

+-------+-----------+-------------+------------+-----------+----------+
|status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs|
+-------+-----------+-------------+------------+-----------+----------+
|SUCCESS|true       |903          |1           |1797       |2         |
+-------+-----------+-------------+------------+-----------+----------+



Note: The above returns the accumulated data skipping statistics for all of the datasets which were involved in the query\

If you want to inspect the stats for a specific dataset you can call the API getting stats on the Xskipper instance:

In [12]:
xskipper.getLatestQueryStats().show(false)

+-------+-----------+-------------+------------+-----------+----------+
|status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs|
+-------+-----------+-------------+------------+-----------+----------+
|SUCCESS|true       |903          |1           |1797       |2         |
+-------+-----------+-------------+------------+-----------+----------+



##### Clearing the stats before running the next query

The data skipping stats are accumulated stats of all dataset readings since the last time `clearStats` or `reset` was called.\
Here we clear the stats after each query to get the data skipping stats for each query separately. 

In [13]:
Xskipper.clearStats()

### Example query using Value list index

In [14]:
spark.sql("select * from sample where city IN ('Jerusalem', 'Ramat-Gan')").show()

+----+---------+---+----------+
|temp|     city|vid|        dt|
+----+---------+---+----------+
|30.0|Jerusalem|  b|2017-07-08|
+----+---------+---+----------+



#### Inspecting query stats

In [15]:
Xskipper.getLatestQueryAggregatedStats(spark).show(false)

+-------+-----------+-------------+------------+-----------+----------+
|status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs|
+-------+-----------+-------------+------------+-----------+----------+
|SUCCESS|true       |894          |1           |1797       |2         |
+-------+-----------+-------------+------------+-----------+----------+



##### Clearing the stats before running the next query

In [16]:
Xskipper.clearStats()

### Example Query using Bloom filter index

In [17]:
spark.sql("select * from sample where vid = 'a'").show()

+----+--------+---+----------+
|temp|    city|vid|        dt|
+----+--------+---+----------+
|20.0|Tel-Aviv|  a|2017-07-07|
+----+--------+---+----------+



#### Inspecting query stats

In [18]:
Xskipper.getLatestQueryAggregatedStats(spark).show(false)

+-------+-----------+-------------+------------+-----------+----------+
|status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs|
+-------+-----------+-------------+------------+-----------+----------+
|SUCCESS|true       |903          |1           |1797       |2         |
+-------+-----------+-------------+------------+-----------+----------+



##### Clearing the stats before running the next query

In [19]:
Xskipper.clearStats()

## Index Life Cycle<a id='index_life_cycle'></a>

The following operations can be used in order to maintain the index

### Refresh Index

Overtime the index might get stale in case new files are added/removed/modified from the dataset.\
In order to bring the index up-to-date you can call the refresh operation which will index the new/modified files and remove obsolete metadata.

Note: The index will still be useful for files which didn't change since the last indexing time even without refershing.

In [20]:
val update_data = Seq(
  Row("2017-07-09", 25.0, "Beer-Sheva", "c")
)

val update_ds = spark.createDataFrame(
  spark.sparkContext.parallelize(update_data),
  StructType(schema)
)

// append to the existing dataset
update_ds.write.partitionBy("dt").mode("append").parquet(dataset_location)

Inspecting index status:

In [21]:
xskipper.describeIndex(reader).show(false)

+--------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|Data Skipping Index Stats |cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp/data                                                                             |Comment                                                                    |
+--------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|Status                    |Out of date - please use REFRESH operation to update the index                                                                        |                                                                           |
|Total new/modified objects|33% of the o

as you can see there is one new object which is not indexed therefore we will call the refresh operation:

In [22]:
xskipper.refreshIndex(reader).show(false)

+-------+-----------------+-------------------+
|status |new_entries_added|old_entries_removed|
+-------+-----------------+-------------------+
|SUCCESS|1                |0                  |
+-------+-----------------+-------------------+



Inspecting index status following the refresh:

In [23]:
xskipper.describeIndex(reader).show(false)

+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|Data Skipping Index Stats|cos://guyx27snotebooks-donotdelete-pr-kczcce0t0nsznp/data                                                                             |Comment                                                                    |
+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|Status                   |Up to date                                                                                                                            |                                                                           |
|Total objects indexed    |3                

### Drop Index

In order to drop the index use the following API call:

In [24]:
xskipper.dropIndex()

## Working with Hive table<a id='hive_tables'></a>

Xskipper also supports skipping over hive tables.

The API for working with hive tables is similar to the API presented above with 2 major differences:
1. The `uri` used in the Xskipper constructor is the table identifier in the form: `<db>.<table>`.
2. The API calls do not require a `DataFrameReader`.

For more info regarding the API see [here]()

The metadata location for a hive table is resolved according to the following:
1. If the table contains the parameter `io.xskipper.parquet.mdlocation` the value will be used as the metadata location
2. Else, xskipper will look up the parameter `io.xskipper.parquet.mdlocation` in the table's database and will used it as the base metadata location for all tables.

Note: During indexing the index location parameter can be automatically added to the table properties if the xskipper instance is configured accordingly.  
For more info regarding the metadata location configuration see [here]().

In this example we will set the base location in the database.

### Setting the base metadata location in the database

In [25]:
val alter_db_ddl = s"ALTER DATABASE default SET DBPROPERTIES ('io.xskipper.parquet.mdlocation'='${md_base_location}')"
spark.sql(alter_db_ddl)

DataFrame[]

### Creating a sample Hive Table

Create the table

In [26]:
val create_table_ddl =
      s"""CREATE TABLE IF NOT EXISTS tbl (
         |temp Double,
         |city String,
         |vid String,
         |dt String
         |)
         |USING PARQUET
         |PARTITIONED BY (dt)
         |LOCATION '${dataset_location}'
         |""".stripMargin
spark.sql(create_table_ddl)

DataFrame[]

Recover the table partitions

In [27]:
spark.sql("ALTER TABLE tbl RECOVER PARTITIONS")

DataFrame[]

verify the table was created

In [28]:
spark.sql("show tables").show(false)
spark.sql("show partitions tbl").show(false)

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|default |tbl      |false      |
|        |sample   |true       |
+--------+---------+-----------+

+-------------+
|partition    |
+-------------+
|dt=2017-07-07|
|dt=2017-07-08|
|dt=2017-07-09|
+-------------+



### Indexing a Hive Table

notice we use `default.sample` as the uri in the Xskipper constructor

In [29]:
// create Xskipper instance for the sample Hive Table
val xskipper_hive = new Xskipper(spark, "default.tbl")

// remove existing index if needed
if (xskipper_hive.isIndexed()) {
  xskipper_hive.dropIndex()
}

xskipper_hive.indexBuilder()
        .addMinMaxIndex("temp")
        .addValueListIndex("city")
        .addBloomFilterIndex("vid")
        .build()
        .show(false)

+-------+-----------------+-------------------+
|status |new_entries_added|old_entries_removed|
+-------+-----------------+-------------------+
|SUCCESS|3                |0                  |
+-------+-----------------+-------------------+



### View the created index status

The following code shows how a user can view current index status to check which indexes exist on the dataset and whether the index is up-to-date

In [30]:
xskipper_hive.describeIndex().show(false)

+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|Data Skipping Index Stats|default.tbl                                                                                                                           |Comment                                                                    |
+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------+
|Status                   |Up to date                                                                                                                            |                                                                           |
|Total objects indexed    |3                

### Enable/Disable Xskipper

Make sure Xskipper is enabled

In [32]:
// You can use the following to check whether the Xskipper is enabled
if (!spark.isXskipperEnabled()) {
    spark = spark.enableXskipper()
}

## Running Queries

Once Xskipper has been enabled you can continue running queries (using either SQL or DataFrame API) regularly and enjoy data skipping.

### Example query using Min/max index

In [33]:
spark.sql("select * from tbl where temp < 30").show(false)

+----+----------+---+----------+
|temp|city      |vid|dt        |
+----+----------+---+----------+
|25.0|Beer-Sheva|c  |2017-07-09|
|20.0|Tel-Aviv  |a  |2017-07-07|
+----+----------+---+----------+



##### Inspecting query skipping stats
You can inspect the data skipping statistics for the latest query using the following API:

In [34]:
Xskipper.getLatestQueryAggregatedStats(spark).show(false)

+-------+-----------+-------------+------------+-----------+----------+
|status |isSkippable|skipped_Bytes|skipped_Objs|total_Bytes|total_Objs|
+-------+-----------+-------------+------------+-----------+----------+
|SUCCESS|true       |903          |1           |2709       |3         |
+-------+-----------+-------------+------------+-----------+----------+



## Index Life Cycle

The following operations can be used in order to maintain the index

### Refresh Index on Hive Table

In [36]:
xskipper_hive.refreshIndex().show(false)

+-------+-----------------+-------------------+
|status |new_entries_added|old_entries_removed|
+-------+-----------------+-------------------+
|SUCCESS|0                |0                  |
+-------+-----------------+-------------------+



### Drop Index

In order to drop the index use the following API call:
(Dropping the index will also remove the index location from the table properties)

In [37]:
xskipper_hive.dropIndex()