Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

join not only by time but additionally also by column #54

Closed
geoHeil opened this issue Oct 20, 2018 · 3 comments
Closed

join not only by time but additionally also by column #54

geoHeil opened this issue Oct 20, 2018 · 3 comments

Comments

@geoHeil
Copy link

geoHeil commented Oct 20, 2018

How can I join not only by time but also by a column?

Currently, I get: Found duplicate columns, but I would like to perform the time series join per group.

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("group", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("group", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")

fails due to duplicate columns.

When renaming the columns:

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("groupA", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("groupB", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")
  mergedPerGroup.toDF.printSchema
  mergedPerGroup.toDF.show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     1|   0.1|     3|    13|
|1000000|     3|   0.3|     3|    13|
|2000000|     1|   0.2|     3|    14|
|2000000|     3|   0.4|     3|    14|
+-------+------+------+------+------+

a cross join is performed between each group and time series.
that needs to be manually reduced.

mergedPerGroup.toDF.filter(col("groupA") === col("groupB")).show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     3|   0.3|     3|    13|
|2000000|     3|   0.4|     3|    14|

Is there any functionality to perform this type of join more efficiently / built in?

@icexelloss
Copy link
Member

leftJoin takes a "key" argument that allows you to specify the secondary join key (equality only)

@geoHeil
Copy link
Author

geoHeil commented Nov 1, 2018

Thanks.

one fighter question: when using an interval which is rather large (i.e. multiple values from the right fall into the interval from the left the join will only join the first record. Which means the distinct I previously used is not required.

leftTs.leftJoin(rightTS, tolerance = "1s", key = Seq("group")).toDF.show
+-------+-----+------+------+
|   time|group|valueA|valueB|
+-------+-----+------+------+
|1000000|    1|   0.1|    11|
|1000000|    3|   0.3|    13|
|2000000|    1|   0.2|    12|
|2000000|    3|   0.4|    14|
+-------+-----+------+------+

is looking better

leftTs.leftJoin(rightTS, tolerance = "1hour", key = Seq("group")).toDF.show
+-------+-----+------+------+
|   time|group|valueA|valueB|
+-------+-----+------+------+
|1000000|    1|   0.1|    11|
|1000000|    3|   0.3|    13|
|2000000|    1|   0.2|    12|
|2000000|    3|   0.4|    14|
+-------+-----+------+------+

@geoHeil
Copy link
Author

geoHeil commented Nov 1, 2018

I believe this is also stated in the documentation:

leftJoin A function performs the temporal left-join to the right TimeSeriesRDD, i.e. left-join using
inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDDs is as follows.

@geoHeil geoHeil closed this as completed Nov 1, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants