Skip to content

User Guide

flow edited this page Jul 6, 2017 · 35 revisions

Table Of Contents

IndexR currently works as Apache Hive file format and Apache Drill storage plugin. We create and manage IndexR tables via indexr-tool, do fast queries via Drill, and update partitions or run long/complex SQLs via Hive. We can also ingest streaming events via Apache Kakfa in realtime.

Support Data Types

  • int
  • bigint, alias: long
  • float
  • double
  • varchar, alias: string
  • date. Format: 2017-02-17.
  • datetime, alias: timestamp. Format: 2017-02-17 12:30:03 or 2017-02-17T12:30:03. Supports millisecond, e.g. 2017-02-17 12:30:03.214.

Note: If you are using date, and datetime type, make sure that Hive and Hadoop are using the same timezone. e.g.

  • export HADOOP_CLIENT_OPTS="-Duser.timezone=UTC" in hive-env.sh file.
  • set mapreduce.reduce.java.opts and mapreduce.map.java.opts with -Duser.timezone=UTC in mapred-site.xml file.

Table Management

Create Table In IndexR

Assume that we are going to create an new table called test. Edit the schema of the table, like test_schema.json. And declare the table via indexr-tool.

$> cd .../indexr-tool
$> bin/tools.sh -cmd settb -t test -c test_schema.json
$> bin/tools.sh -cmd listtb

You can check all commands by typing bin/tools.sh -h.

Go to Drill console and check whether table test have been created or not.

$> cd .../drill
$> bin/drill-conf
0: jdbc:drill:> use indexr;
0: jdbc:drill:> show tables;

Here is the content of test_schema.json:

{
    "schema":{
        "columns":
        [
            {"name": "date", "dataType": "int"},
            {"name": "d1", "dataType": "string"},
            {"name": "m1", "dataType": "int"},
            {"name": "m2", "dataType": "bigint"},
            {"name": "m3", "dataType": "float", "default": "-0.1"},
            {"name": "m4", "dataType": "double"}
        ]
    },
    "location": "/indexr/segment/test",
    "mode": "vlt",
    "agg":{
        "grouping": true,
        "dims": [
            "date",
            "d1"
        ],
        "metrics": [
            {"name": "m1", "agg": "sum"},
            {"name": "m2", "agg": "min"},
            {"name": "m3", "agg": "max"},
            {"name": "m4", "agg": "first"}
        ]
    }
}
  • schema - [required] the schema of table.
    • columns - defines all columns of table.

    • location - (v0.6.0+) the data path of table. It will be set to ${indexr.fs.data.root}/segment/<tableName> if omitted. Node that this option cannot be changed after table creation.

    • mode - [default vlt] the segment mode. Supports basic and vlt.

    • sort.columns - [deprecated, use agg instead]

    • agg - the pre-aggregation setting, default null.

      • grouping - [optional, default false] rollup the events or not. If true, dims and metrics are required.

      Rollup will merge those events with the same dimensions together, works like SQL

       select `date`, d1, sum(m1), min(m2), max(m3), first(m4) from table group by `date`, d1
      

      It can greatly reduce data size in OLAP scenario, since we don't care about indivatual event, but the whole pictures on dimension groups.

      • dims - [required if grouping is true] the rollup dimensions.
      • metrics - [required if grouping is true] the rollup metrics. Currently supports aggregation functions:
        • sum
        • min
        • max
        • first
        • last

Create Table In Hive

Besides table in Drill, we also need table in Hive, for table content management.

You can use indexr-tool to generate hive table creation SQL.

$> cd .../indexr-tool
$> bin/tools.sh -cmd hivesql -t test -col dt

Open Hive console, create a hive table, and insert some rows.

$> cd .../hive
$> bin/hive
hive (default)> CREATE EXTERNAL TABLE IF NOT EXISTS test (
  `date` int,
  `d1` string,
  `m1` int,
  `m2` bigint,
  `m3` float,
  `m4` double
) 
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE 'io.indexr.hive.IndexRSerde' 
STORED AS INPUTFORMAT 'io.indexr.hive.IndexRInputFormat' 
OUTPUTFORMAT 'io.indexr.hive.IndexROutputFormat' 
LOCATION '/indexr/segment/test' 
TBLPROPERTIES (
  'indexr.segment.mode'='vlt',
  'indexr.index.columns'='d1',
  'indexr.agg.grouping'='true',
  'indexr.agg.dims'='date,d1',
  'indexr.agg.metrics'='m1:sum,m2:min,m3:max,m4:first'
) 
;
hive (default)> insert into table test partition (dt=20160701) values(20160701,'mac',100,192444,1.55,-331.43555);
hive (default)> select * from test limit 10;
hive (default)> select `date`, sum(m1) from test group by `date` order by sum(m1) desc limit 10;

Some notes here.

  • Make sure you have the none partition columns with the same meaning in Hive table schema, e.g. "date" to "dt".

    Here we use dt as partition column, date as the corresponding none partition column. Partition data by date.

  • Specify correct FORMAT and SERDE

     ROW FORMAT SERDE 
     	'io.indexr.hive.IndexRSerde' 
     STORED AS 
     	INPUTFORMAT 'io.indexr.hive.IndexRInputFormat' 
     	OUTPUTFORMAT 'io.indexr.hive.IndexROutputFormat' 
    
  • Specify correct LOCATION path.

    Currently an IndexR hive table location is fixed in ${indexr.fs.data.root}/segment/<tableName>. indexr.fs.data.root is the config option in indexr.config.properties file.

  • EXTERNAL mode is suggested if you want to update the table schema later in minimal effort.

  • TBLPROPERTIES.

    • indexr.index.columns - [default not indexed] the columns which are indexed, seperated by comma. e.g. "col0,col1".
    • indexr.segment.mode - [default vlt] the segment mode. Supports basic and vlt.
    • indexr.agg.grouping, indexr.agg.dims, indexr.agg.metrics - Rollup settings, especially useful in OLAP scenario. Free to omit if you don't need to do pre-aggregation. By default none.

After updated any data in HDFS, we should notify IndexR to follow up.

$> cd .../indexr-tool
$> bin/tools.sh -cmd notifysu -t test

Now we can go to Drill console and query those data generated by hive.

Setup Realtime Ingestion

In many cases we want to analyze those events immediately after take place. IndexR's realtime ingestion mechanism come to help. We can make some IndexR nodes to fetch events from Kafka in realtime, and those events can be queried right after they arrive.

Edit test_schema.json, add the realtime setting.

{
    "schema":{
        "columns":
        [
            {"name": "date", "dataType": "int"},
            {"name": "d1", "dataType": "string", "index": true},
            {"name": "m1", "dataType": "int"},
            {"name": "m2", "dataType": "bigint"},
            {"name": "m3", "dataType": "float", "default": "-0.1"},
            {"name": "m4", "dataType": "double"}
        ]
    },
    "mode": "vlt",
    "agg":{
        "grouping": true,
        "dims": [
            "date",
            "d1"
        ],
        "metrics": [
            {"name": "m1", "agg": "sum"},
            {"name": "m2", "agg": "min"},
            {"name": "m3", "agg": "max"},
            {"name": "m4", "agg": "first"}
        ]
    },
    "realtime":{
        "name.alias": {
            "date": "dt",
            "m1": "m1_alias"
        },
        "tag.setting": {
            "tag.field": "_tag_",
            "accept.tags": ["a"],
            "accept.none": false
        },
        "save.period.minutes": 20,
        "upload.period.minutes": 60,
        "max.row.memory": 500000,
        "max.row.realtime": 10000000,
        "fetcher": {
            "type": "kafka-0.8",
            "topic": "test_topic",
            "number.empty.as.zero": false,
            "properties": {
                "zookeeper.connect": "localhost:2181",
                "zookeeper.connection.timeout.ms": "15000",
                "zookeeper.session.timeout.ms": "40000",
                "zookeeper.sync.time.ms": "5000",
                "fetch.message.max.bytes": "1048586",
                "auto.offset.reset": "largest",
                "auto.commit.enable": "true",
                "auto.commit.interval.ms": "5000",
                "group.id": "test_group"
            }
        }
    }
}

  • schema - the schema of table.

    • columns - defines all columns of table. Here you can decide which column is indexed by index.
  • mode - [default vlt] the segment mode. Supports basic and vlt.

  • agg - the pre-aggregation setting, default null. Check previous section for detail.

  • realtime - [optional] realtime ingestion settings. Only required when realtime ingestion is needed.

    • name.alias - [optional, default none] you can specify alias to column name in events.
    • tag.setting - [optional, default accept all] a realtime table can only fetch those events with matching tags.
      • tag.field - the key value of tag in event
      • accept.tags - the tags this table accept
      • accept.none - accept those events without any tags or not
    • save.period.minutes - [optional, default 20] the frequency of saving rows in memory to disk. Normally the default setting is good enough for most cases.
    • upload.period.minutes - [optional, default 60] the frequency of uploading rows in realtime nodes to storage system. The longer of upload frequency, the better of the rollup, but the more delay of hive table can see.
    • max.row.memory - [optional, default 500000] the largest rows this realtime table keep in memory. Normally the default setting is good enough for most cases. If your nodes running on very low memory avaliable machines, consider decrease it.
    • max.row.realtime - [optional, default 10000000] the largest rows this realtime table keep in local node before merge them and upload. The larger of max rows, the better of rollup, but the more delay of hive table can see.
    • fetcher - the data source setting.
      • type - fetcher type, currently supports kafka-0.8
      • topic - kafka topic
      • topics - kafka topics. If you want to fetch from multiple topics. e.g. "topics": ["topic0", "topic1"]. This setting will override topic if set.
      • number.empty.as.zero - [default false]. Sometimes you may want to parse an empty string into zero number, you can set this to true. Otherwise it will be treated as illegal event and dropped away.
      • properties - kafka consumer properties. Please take attention on specifying the correct group.id.

Now update the table setting by indexr-tool

$> cd .../indexr-tool
$> bin/tools.sh -cmd settb -t test -c test_schema.json
$> bin/tools.sh -cmd dctb -t test

List current avaliable IndexR nodes.

$> bin/tools.sh -cmd listnode
hostA
hostB
hostC
hostD

Pick some nodes you like, e.g. hostA,hostC, and make them start fetching events.

$> bin/tools.sh -cmd addrtt -t test -host hostA,hostC
OK
$> bin/tools.sh -cmd rttnode -t test
hostA
hostC

Push some messages into Kafka topic test_topic, the messages are encoded in UTF-8 JSON format. example:

{"_tag_": "a,b", "date": 20160702, "d1": "mac", "m1": 100, "m2": 48224, "m3": 0.76}

One message can contains many events, seperated by ,.

{...},{...},{...}

Go to Drill console and check whether your events have been ingested or not.

$> cd .../drill
$> bin/drill-conf
0: jdbc:drill:> select * from test where d1 = 'mac' limit 10;

Setup Realtime To Historical

When the realtime ingestion is setup, the realtime events are continuously grouped into segments and uploaded to <table_path>/rt folder.

$> bin/hdfs dfs -ls -R /indexr/segment/test
/indexr/segment/test/__UPDATE__
/indexr/segment/test/dt=20160701/
/indexr/segment/test/dt=20160701/000000_0
/indexr/segment/test/rt/
/indexr/segment/test/rt/rtsg.201607021725.e06ca84e-1d65-4f37-a799-b406d01dd10e.seg
/indexr/segment/test/rt/rtsg.201607021825.fop91s73-822s-nxk1-0091-pa1dcw9s1slk.seg

And this could produce problems. rt is not a legal hive table partition path, and worse, it contains rows which could belong to different partitions. Those data cannot be managed unless we shuffle them into correct partitions.

Luckily you can use Hive's Dynamic Partition Inserts.

  • Move all segments in rt folder to a new folder, like rt2his. And notify segment update.
hdfs dfs -mkdir /indexr/segment/test/rt2his
hdfs dfs -mv "/indexr/segment/test/rt/*" "/indexr/segment/test/rt2his/"
tools.sh -cmd notifysu -t test
  • Create another Hive table, e.g. test_rt2his, with location pointed to table_path/rt2his path.
CREATE EXTERNAL TABLE IF NOT EXISTS test_rt2his (
  `date` int,
  `d1` string,
  `m1` int,
  `m2` bigint,
  `m3` float,
  `m4` double
)
ROW FORMAT SERDE 'io.indexr.hive.IndexRSerde' 
STORED AS INPUTFORMAT 'io.indexr.hive.IndexRInputFormat' 
OUTPUTFORMAT 'io.indexr.hive.IndexROutputFormat' 
LOCATION '/indexr/segment/test/rt2his'
;
  • Insert rows from test_rt2his table into the real test table. e.g.
insert into table test partition (dt) select *, `date` from test_rt2his;

  • Remove segments in rt2his folder, and notify updates.
hdfs dfs -rm -r "/indexr/segment/test/rt2his/*"
tools.sh -cmd notifysu -t test

Update Table Schema

Use upcol.sh in indexr-tool.

  • Stop any tasks like rt2his which could manipulate the data in hive table.

  • Update the table schema in IndexR by editing test_schema.json and running script bin/tools.sh -cmd settb -t tes -c test_schema.json. This only effects those updated columns, queries on untouched columned should work normally.

  • Update the hive table schema by ALTER TABLE <tb> ADD COLUMNS(...) or simply dropping the old and creating a new one if your table is in EXTERNAL mode.

  • [optional] If your table have setup realtime ingestion, wait for those old segments to upload into storage system, i.e. rt folder in hive table path. The duration depends on how many realtime segments data on local IndexR nodes. Normally 20 minutes is safe enough.

  • Update historical segments.

    • Add columns
     $> bin/upcol.sh -add -t test -col '[{"name": "m5", "dataType": "long", "value": "100"}]'

    Here value could be a literal value, or a SQL, e.g. if((a > b), a + b, a / c). You can specify many columns. You can put the json param in a file and reference it by '@', e.g. -col @add_cols.json.

    • Remove columns
     $> bin/upcol.sh -del -t test -col m1,m2
    • Alter columns
     $> bin/upcol.sh -alt -t test -col '[{"name": "m1", "dataType": "string", "value": "cast(m1, string)"}]'

    Those scripts run mapreduce jobs, and could take a while to complete, depends on the size of your data.

Drop Table

  • Remove all realtime ingestion nodes of the table.
$> bin/tools.sh -cmd rttnode -t test
test:
-----------
hostA
hostB
$> bin/tools.sh -cmd rmrtt -t test -host hostA,hostB
OK
  • Wait for the realtime segments uploaded to HDFS. The duration depends on how many realtime segments data on local IndexR nodes. Normally 20 minutes is safe enough.

  • Remove segment files on HDFS and notify segment update.

$> hdfs dfs -rm -r ${indexr.fs.data.root}/segment/test
$> .../indexr-tool/bin/tools.sh -cmd notifysu -t test
  • Check if any data left behinded: select count(*) from test

  • Remove IndexR table.

$> bin/tools.sh -cmd rmtb -t test
  • Remove Hive table.
$> hive (default)> drop table test;

Data Management

Currently you can manage partition data in hive table by hive scripts.

Note - After updated any data in HDFS, we should notify IndexR to follow up.

$> cd .../indexr-tool
$> bin/tools.sh -cmd notifysu -t <table_name>

Query

You can query data by Drill or Hive.

Drill reference: Drill Document

Hive reference: Hive User Document