In [2]:
println("Start")

Start


# 1 - Sum consecutive value by Seq

In [8]:
import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.expressions.Window


In [3]:
val df = Seq(
  (1, 10, 0),
  (1, 11, 1),
  (1, 13, 1),
  (1, 16, 1),
  (1, 20, 0),
  (1, 21, 0),
  (1, 22, 1),
  (1, 25, 1),
  (1, 27, 1),
  (1, 29, 1),
  (1, 30, 0),
  (1, 32, 1),
  (1, 34, 1),
  (1, 35, 1),
  (1, 38, 0)).toDF("Category", "Value", "Sequences")
df.show

+--------+-----+---------+
|Category|Value|Sequences|
+--------+-----+---------+
|       1|   10|        0|
|       1|   11|        1|
|       1|   13|        1|
|       1|   16|        1|
|       1|   20|        0|
|       1|   21|        0|
|       1|   22|        1|
|       1|   25|        1|
|       1|   27|        1|
|       1|   29|        1|
|       1|   30|        0|
|       1|   32|        1|
|       1|   34|        1|
|       1|   35|        1|
|       1|   38|        0|
+--------+-----+---------+



df: org.apache.spark.sql.DataFrame = [Category: int, Value: int ... 1 more field]


assing each row unique id

In [6]:
val zipped = df.withColumn("zip", monotonically_increasing_id())
zipped.show

+--------+-----+---------+---+
|Category|Value|Sequences|zip|
+--------+-----+---------+---+
|       1|   10|        0|  0|
|       1|   11|        1|  1|
|       1|   13|        1|  2|
|       1|   16|        1|  3|
|       1|   20|        0|  4|
|       1|   21|        0|  5|
|       1|   22|        1|  6|
|       1|   25|        1|  7|
|       1|   27|        1|  8|
|       1|   29|        1|  9|
|       1|   30|        0| 10|
|       1|   32|        1| 11|
|       1|   34|        1| 12|
|       1|   35|        1| 13|
|       1|   38|        0| 14|
+--------+-----+---------+---+



zipped: org.apache.spark.sql.DataFrame = [Category: int, Value: int ... 2 more fields]


make range from zero to the next zero

In [9]:
val categoryWindow = Window.partitionBy("Category").orderBy($"zip")
val groups = zipped
             .filter($"Sequences" === 0)
             .withColumn("rangeEnd", lead($"zip",1).over(categoryWindow))
             .withColumnRenamed("zip", "rangeStart")
groups.show(false)

+--------+-----+---------+----------+--------+
|Category|Value|Sequences|rangeStart|rangeEnd|
+--------+-----+---------+----------+--------+
|1       |10   |0        |0         |4       |
|1       |20   |0        |4         |5       |
|1       |21   |0        |5         |10      |
|1       |30   |0        |10        |14      |
|1       |38   |0        |14        |null    |
+--------+-----+---------+----------+--------+



categoryWindow: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@5f58cd28
groups: org.apache.spark.sql.DataFrame = [Category: int, Value: int ... 3 more fields]


assing range for each unit

In [10]:
val joinCondition = ($"units.zip" > $"groups.rangeStart").and($"units.zip" < $"groups.rangeEnd")

joinCondition: org.apache.spark.sql.Column = ((units.zip > groups.rangeStart) AND (units.zip < groups.rangeEnd))


In [11]:
val unitsByRange = zipped
                    .filter($"Sequences" === 1).alias("units")
                    .join(groups.alias("groups"), joinCondition, "left")
                    .select("units.Category", "units.Value", "groups.rangeStart")

unitsByRange: org.apache.spark.sql.DataFrame = [Category: int, Value: int ... 1 more field]


In [12]:
unitsByRange.show

+--------+-----+----------+
|Category|Value|rangeStart|
+--------+-----+----------+
|       1|   11|         0|
|       1|   13|         0|
|       1|   16|         0|
|       1|   22|         5|
|       1|   25|         5|
|       1|   27|         5|
|       1|   29|         5|
|       1|   32|        10|
|       1|   34|        10|
|       1|   35|        10|
+--------+-----+----------+



group by range

In [13]:
val result = unitsByRange
                .groupBy($"Category", $"rangeStart")
                .agg(sum("Value").alias("summing"))
                .orderBy("rangeStart")
                .drop("rangeStart")

result: org.apache.spark.sql.DataFrame = [Category: int, summing: bigint]


In [14]:
result.show(false)

+--------+-------+
|Category|summing|
+--------+-------+
|1       |40     |
|1       |103    |
|1       |101    |
+--------+-------+



# 2 - Group By (And sum) consecutive Time Intervals

In [44]:
import org.apache.spark.sql.functions
import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.functions
import org.apache.spark.sql.expressions.Window


In [45]:
val df = Seq((0, "2016-07-02 12:01:40"),
 (1, "2016-07-02 12:21:23"),
 (1, "2016-07-02 13:22:56"),
 (1, "2016-07-02 13:27:07"),
 (0, "2016-07-02 13:30:12"),
 (0, "2016-07-02 13:40:34"),
 (1, "2016-07-02 13:57:07"),
 (1, "2016-07-02 14:08:07")).
toDF("signal", "timestamp").
withColumn("timestamp", functions.to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
df.show()

+------+-------------------+
|signal|          timestamp|
+------+-------------------+
|     0|2016-07-02 12:01:40|
|     1|2016-07-02 12:21:23|
|     1|2016-07-02 13:22:56|
|     1|2016-07-02 13:27:07|
|     0|2016-07-02 13:30:12|
|     0|2016-07-02 13:40:34|
|     1|2016-07-02 13:57:07|
|     1|2016-07-02 14:08:07|
+------+-------------------+



df: org.apache.spark.sql.DataFrame = [signal: int, timestamp: timestamp]


In [49]:
val newSignalWindow = Window.orderBy("timestamp")

newSignalWindow: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@3c4ac36b


In [50]:
val dfWithNewSignal = df.withColumn("new_signal",
                                    (functions.lag(col("signal"),1,0).over(newSignalWindow) === 0 && 
                                     col("signal") === 1).cast("bigint"))

dfWithNewSignal: org.apache.spark.sql.DataFrame = [signal: int, timestamp: timestamp ... 1 more field]


In [51]:
dfWithNewSignal.show

2019-08-21 11:47:04 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+-------------------+----------+
|signal|          timestamp|new_signal|
+------+-------------------+----------+
|     0|2016-07-02 12:01:40|         0|
|     1|2016-07-02 12:21:23|         1|
|     1|2016-07-02 13:22:56|         0|
|     1|2016-07-02 13:27:07|         0|
|     0|2016-07-02 13:30:12|         0|
|     0|2016-07-02 13:40:34|         0|
|     1|2016-07-02 13:57:07|         1|
|     1|2016-07-02 14:08:07|         0|
+------+-------------------+----------+



In [56]:
val dfWithIdSignal = dfWithNewSignal.
                            filter(col("signal") === 1).
                            withColumn("new_signal", 
                                       functions.
                                          sum("new_signal").
                                          over(newSignalWindow))

dfWithIdSignal: org.apache.spark.sql.DataFrame = [signal: int, timestamp: timestamp ... 1 more field]


In [57]:
dfWithIdSignal.show

2019-08-21 11:52:54 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2019-08-21 11:52:54 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+-------------------+----------+
|signal|          timestamp|new_signal|
+------+-------------------+----------+
|     1|2016-07-02 12:21:23|         1|
|     1|2016-07-02 13:22:56|         1|
|     1|2016-07-02 13:27:07|         1|
|     1|2016-07-02 13:57:07|         2|
|     1|2016-07-02 14:08:07|         2|
+------+-------------------+----------+



In [61]:
val resultDF = dfWithIdSignal.
                  groupBy("new_signal").
                      agg(functions.min("timestamp").as("start_date"), functions.max("timestamp").as("end_date"),
                          functions.count("*").as("positive_count")).drop("new_signal")

resultDF: org.apache.spark.sql.DataFrame = [start_date: timestamp, end_date: timestamp ... 1 more field]


In [62]:
resultDF.show

2019-08-21 11:56:47 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2019-08-21 11:56:47 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-------------------+-------------------+--------------+
|         start_date|           end_date|positive_count|
+-------------------+-------------------+--------------+
|2016-07-02 12:21:23|2016-07-02 13:27:07|             3|
|2016-07-02 13:57:07|2016-07-02 14:08:07|             2|
+-------------------+-------------------+--------------+

