Skip to content

IndexR Spark

flow edited this page Jun 27, 2017 · 3 revisions

Using IndexR in Spark

Start from indexr-0.6.0, IndexR supports Spark 2.1.0+. You can use Spark to manage and query tables with IndexR file format. IndexR file format supports all operations Spark supported, like Parquet.

Node that IndexR in Spark does not support realtime ingestion, due to the running architecture of Spark.

Following the instruction here to setup IndexR file format in Spark.

Here we use test table with following schemas for example.

IndexR schema:

{
    "schema":{
        "columns":
        [
            {"name": "date", "dataType": "int"},
            {"name": "d1", "dataType": "string"},
            {"name": "m1", "dataType": "int"},
            {"name": "m2", "dataType": "bigint"},
            {"name": "m3", "dataType": "float", "default": "-0.1"},
            {"name": "m4", "dataType": "double"}
        ]
    },
    "location": "/indexr/segment/test",
    "mode": "vlt",
    "agg":{
        "grouping": true,
        "dims": [
            "date",
            "d1"
        ],
        "metrics": [
            {"name": "m1", "agg": "sum"},
            {"name": "m2", "agg": "min"},
            {"name": "m3", "agg": "max"},
            {"name": "m4", "agg": "first"}
        ]
    }
}

Hive schema:


CREATE EXTERNAL TABLE IF NOT EXISTS test (
  `date` int,
  `d1` string,
  `m1` int,
  `m2` bigint,
  `m3` float,
  `m4` double
) 
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE 'io.indexr.hive.IndexRSerde' 
STORED AS INPUTFORMAT 'io.indexr.hive.IndexRInputFormat' 
OUTPUTFORMAT 'io.indexr.hive.IndexROutputFormat' 
LOCATION '/indexr/segment/test' 
TBLPROPERTIES (
  'indexr.segment.mode'='vlt',
  'indexr.index.columns'='d1',
  'indexr.agg.grouping'='true',
  'indexr.agg.dims'='date,d1',
  'indexr.agg.metrics'='m1:sum,m2:min,m3:max,m4:first'
) 
;

Spark schema:

CREATE TABLE test_spark (
  `date` int,
  `d1` string,
  `m1` int,
  `m2` bigint,
  `m3` float,
  `m4` double,
  `dt` string
) 
USING org.apache.spark.sql.execution.datasources.indexr.IndexRFileFormat
OPTIONS (
  'path'='/indexr/segment/test' ,
  'indexr.segment.mode'='vlt',
  'indexr.index.columns'='d1',
  'indexr.agg.grouping'='true',
  'indexr.agg.dims'='date,d1',
  'indexr.agg.metrics'='m1:sum,m2:min,m3:max,m4:first'
) 
PARTITIONED BY (dt)
;

Create Spark Table

> CREATE TABLE test_spark (
  `date` int,
  `d1` string,
  `m1` int,
  `m2` bigint,
  `m3` float,
  `m4` double,
  `dt` string
) 
USING org.apache.spark.sql.execution.datasources.indexr.IndexRFileFormat
OPTIONS (
  'path'='/indexr/segment/test' ,
  'indexr.segment.mode'='vlt',
  'indexr.index.columns'='d1',
  'indexr.agg.grouping'='true',
  'indexr.agg.dims'='date,d1',
  'indexr.agg.metrics'='m1:sum,m2:min,m3:max,m4:first'
) 
PARTITIONED BY (dt)
;

> msck repair table test_spark;
> select * from test_spark limit 10;
> insert into table test_spark partition (dt=20160702) values(20160702,'mac',100,192444,1.55,-331.43555);

Clone this wiki locally