Skip to content
No description, website, or topics provided.
Java Scala
Branch: master
Clone or download
Pull request Compare This branch is 11 commits ahead, 1 commit behind qiaojialin:master.
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
data
src
.gitignore
README.md
pom.xml

README.md

tsfile-spark-connector

Used to read and write(developing) tsfile in spark.

将一个或多个TsFile展示成SparkSQL中的一张表。允许指定单个目录,或使用通配符匹配多个目录。如果是多个TsFile,schema将保留各个TsFile中sensor的并集。

1. dependency

https://github.com/thulab/tsfile.git

2. versions

The versions required for Spark and Java are as follow:

Spark Version Scala Version Java Version TsFile
2.0+ 2.11 1.8 0.4.0

ATTENTION: Please check the jar packages in the root directory of your spark and replace libthrift-0.9.2.jar and libfb303-0.9.2.jar with libthrift-0.9.1.jar and libfb303-0.9.1.jar respectively.

3. TsFile Type <=> SparkSQL type

This library uses the following mapping the data type from TsFile to SparkSQL:

TsFile SparkSQL
INT32 IntegerType
INT64 LongType
FLOAT FloatType
DOUBLE DoubleType
BYTE_ARRAY StringType

4. TsFile Schema <-> SparkSQL Table Structure

The set of time-series data in section "Time-series Data" is used here to illustrate the mapping from TsFile Schema to SparkSQL Table Stucture.

delta_object:root.car.turbine1
sensor_1sensor_2sensor_3
timevaluetimevaluetimevalue
11.2120250
31.4220451
51.1321652
71.8420853
A set of time-series data

4.1. using delta_object as reserved column

There are two reserved columns in Spark SQL Table:

  • time : Timestamp, LongType
  • delta_object : Delta_object ID, StringType

The SparkSQL Table Structure is as follow:

time(LongType) delta_object(StringType)sensor_1(FloatType)sensor_2(IntType)sensor_3(IntType)
1 root.car.turbine1 1.220null
2 root.car.turbine1 null2050
3 root.car.turbine1 1.421null
4 root.car.turbine1 null2051
5 root.car.turbine1 1.1nullnull
6 root.car.turbine1 nullnull52
7 root.car.turbine1 1.8nullnull
8 root.car.turbine1 nullnull53

4.2. unfolding delta_object column

If you want to unfold the delta_object column into multi columns you should add an option when read and write:

e.g.

option("delta_object_name" -> "root.device.turbine")

The "delta_object_name" is reserved key.

Then The SparkSQL Table Structure is as follow:

time(LongType) device(StringType) turbine(StringType)sensor_1(FloatType)sensor_2(IntType)sensor_3(IntType)
1 car turbine1 1.220null
2 car turbine1 null2050
3 car turbine1 1.421null
4 car turbine1 null2051
5 car turbine1 1.1nullnull
6 car turbine1 nullnull52
7 car turbine1 1.8nullnull
8 car turbine1 nullnull53

Then you can group by any level in delta_object. And then with the same option you can write this dataframe to TsFile.

5. Building

mvn clean scala:compile compile package

6. Examples

The path of 'test.tsfile' used in the following examples is "data/test.tsfile". Please upload 'test.tsfile' to hdfs in advance and the directory is "/test.tsfile".

6.1 Scala API

  • Example 1

     import cn.edu.tsinghua.tsfile._
     
     //read data in TsFile and create a table
     val df = spark.read.tsfile("/test.tsfile")
     df.createOrReplaceTempView("tsfile_table")
     
     //query with filter
     val newDf = spark.sql("select * from tsfile_table where s1 > 1.2").cache()
     
     newDf.show()
  • Example 2

     val df = spark.read
        .format("cn.edu.tsinghua.tsfile")
        .load("/test.tsfile ")
     df.filter("time < 10").show()
  • Example 3

     //create a table in SparkSQL and build relation with a TsFile
     spark.sql("create temporary view tsfile_table using cn.edu.tsinghua.tsfile options(path = \"test.ts\")")
     
     spark.sql("select * from tsfile_table where s1 > 1.2").show()
  • Example 4(using options to read)

     import cn.edu.tsinghua.tsfile._
     
     val df = spark.read.option("delta_object_name", "root.device.turbine").tsfile("/test.tsfile")
          
     //create a table in SparkSQL and build relation with a TsFile
     df.createOrReplaceTempView("tsfile_table")
      
     spark.sql("select * from tsfile_table where turbine = 'd1' and device = 'car' and time < 10").show()
  • Example 5(write)

     import cn.edu.tsinghua.tsfile._
     
     val df = spark.read.tsfile("/test.tsfile").write.tsfile("/out")
  • Example 6(using options to write)

     import cn.edu.tsinghua.tsfile._
     
     val df = spark.read.option("delta_object_name", "root.device.turbine").tsfile("/test.tsfile")
          
     df.write.option("delta_object_name", "root.device.turbine").tsfile("/out")

6.2 spark-shell

6.2.1 Start Spark

6.2.1.1 Local Mode
./spark-2.0.1-bin-hadoop2.7/bin/spark-shell  --jars  tsfile-0.4.0.jar,tsfile-spark-connector-0.4.0.jar

ATTENTION:

  • Please replace "spark-2.0.1-bin-hadoop2.7/bin/spark-shell" with the real path of your spark-shell.
  • Multiple jar packages are separated by commas without any spaces.
  • The latest version used is v0.4.0.
6.2.1.2 Distributed Mode
. /spark-2.0.1-bin-hadoop2.7/bin/spark-shell  --jars  tsfile-0.4.0.jar,tsfile-spark-connector-0.4.0.jar  --master spark://ip:7077
You can’t perform that action at this time.