In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("Ex_").master("spark://spark-master:7077").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/08 15:08:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Problem

Here we're going to try establish winning and losing streaks. We're gonna work with a dataset that has two columns: event_date and result like this :

| Event date | Result |
| -------- | -------- |
| 2023-03-07   | Win   |
| 2023-03-08   | Win   |
| 2023-03-09   | Win   |
| 2023-03-10   | Win   |
| 2023-03-11   | Lose   |
| 2023-03-12   | Lose   |
| 2023-03-13   | Lose   |
| 2023-03-14   | Lose   |
| 2023-03-15   | Win   |
| 2023-03-16   | Win   |
| 2023-03-17   | Win   |


And the purpose is to pass to a table like this

| Result | Start Date | End Date | Count |
| -------- | -------- | -------- | -------- |
| Win   | 2023-03-07   | 2023-03-10   | 3 |
| Lose   | 2023-03-11   | 2023-03-14  | 3 |
| Win   | 2023-03-15   | 2023-03-17   | 2 |


In [2]:

# Create The data 
data = [
    ("2024-03-01", "Win"), ("2024-03-02" , "Win"),  ("2024-03-03", "Win"), ("2024-03-04" , "Lose"),
    ("2024-03-05", "Win"), ("2024-03-06" , "Win"),  ("2024-03-07", "Lose"),("2024-03-08", "Lose"),
    ("2024-03-09", "Lose"),("2024-03-10", "Lose"),  ("2024-03-11", "Lose"),("2024-03-12", "Lose"),
    ("2024-03-13", "Win"), ("2024-03-14", "Win"),   ("2024-03-15", "Win"), ("2024-03-16", "Win"),
    ("2024-03-17", "Win"), ("2024-03-18", "Win"),   ("2024-03-19", "Lose"),("2024-03-20", "Lose")
]
df = spark.createDataFrame(data,["Event_date","Result"])
df = df.withColumn("Event_date",to_date(col("Event_date")))
df.show(3)
df.printSchema()

# ////////////////////////////////////////////////////////////////////////////////////////////////////////////////


# The first step is to find the row in which it changes the status 
# We're gonna use the window Function Lag
window_spec = Window.orderBy("Event_date")
lag_table = df.withColumn("Prev_status", lag(col("Result")).over(window_spec) )

def compare(x,y):
    if x == y or y is None :
        return 0
    else:
        return 1
compare_udf = udf(compare)
lag_table = lag_table.withColumn(
    "Event_Change",
    compare_udf(
        col("Result"),
        col("Prev_status")
    ) 
).select("Event_date","Result","Event_Change")

# ////////////////////////////////////////////////////////////////////////////////////////////////////////////////

# The next Step is to create a Running Sum of teh Event Change so we can group together and calculate the Streaks
group_table = lag_table.withColumn("Group", sum(col("Event_Change")).over(window_spec))

# ////////////////////////////////////////////////////////////////////////////////////////////////////////////////

# Now that the Group is created . We're gonna Group the the results and the streaks Together and we're gonna aggreaget by the first and the last Value
group_table_ = group_table.groupBy("Group","Result") \
    .agg(
        min("Event_date").alias("Initial_date"),
        max("Event_date").alias("End_date")
)
group_table_final = group_table_.withColumn(
    "Streak_Days",
    datediff(col("End_date") ,col("Initial_date")) + 1).select("Result","Initial_date","End_date","Streak_Days")
group_table_final.show()

                                                                                

+----------+------+
|Event_date|Result|
+----------+------+
|2024-03-01|   Win|
|2024-03-02|   Win|
|2024-03-03|   Win|
+----------+------+
only showing top 3 rows

root
 |-- Event_date: date (nullable = true)
 |-- Result: string (nullable = true)



24/03/08 15:08:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:08:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:08:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:08:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:08:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:08:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 1

+------+------------+----------+-----------+
|Result|Initial_date|  End_date|Streak_Days|
+------+------------+----------+-----------+
|   Win|  2024-03-01|2024-03-03|          3|
|  Lose|  2024-03-04|2024-03-04|          1|
|   Win|  2024-03-05|2024-03-06|          2|
|  Lose|  2024-03-07|2024-03-12|          6|
|   Win|  2024-03-13|2024-03-18|          6|
|  Lose|  2024-03-19|2024-03-20|          2|
+------+------------+----------+-----------+



                                                                                

<hr>

In [11]:
#Let's do the same using only SQL
df.createOrReplaceTempView("events")
sql_result = spark.sql(
    """
    with CTE1 as (
        SELECT
            Event_date,
            Result,
            CASE WHEN Result != LAG(Result) OVER(ORDER BY Event_date) THEN 1 ELSE 0 END AS prev
        FROM
            EVENTS
    )
    ,
    CTE2 as (
        SELECT Event_date,Result, SUM(prev) OVER (ORDER BY Event_date) AS GROUP
        FROM CTE1
    )
    SELECT
        Result,
        MIN(Event_date) as Initial,
        MAX(Event_date) as End,
        count(*) as Duration_in_days
    FROM CTE2
    GROUP BY
        GROUP, Result
    """
)
sql_result.show()

24/03/08 15:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 15:21:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/08 1

+------+----------+----------+----------------+
|Result|   Initial|       End|Duration_in_days|
+------+----------+----------+----------------+
|   Win|2024-03-01|2024-03-03|               3|
|  Lose|2024-03-04|2024-03-04|               1|
|   Win|2024-03-05|2024-03-06|               2|
|  Lose|2024-03-07|2024-03-12|               6|
|   Win|2024-03-13|2024-03-18|               6|
|  Lose|2024-03-19|2024-03-20|               2|
+------+----------+----------+----------------+



                                                                                