In [0]:
#Find Start and End date for each Consecutive Event Status Window

from pyspark.sql.functions import *

#create the input DataFrame

data=[
("2020-06-01","Won"),
("2020-06-02","Won"),
("2020-06-03","Won"),
("2020-06-04","Lost"),
("2020-06-05","Lost"),
("2020-06-06","Lost"),
("2020-06-07","Won")
]

df=spark.createDataFrame(data,["event_date","event_status"])

#convert the event date column to date type
df=df.withColumn("event_date",to_date(col("event_date")))

df.display()

event_date,event_status
2020-06-01,Won
2020-06-02,Won
2020-06-03,Won
2020-06-04,Lost
2020-06-05,Lost
2020-06-06,Lost
2020-06-07,Won


In [0]:
from pyspark.sql.window import Window

#find event changing row
#ie. create a column "event_change" to identify changes in the event_name

from pyspark.sql.window import Window
 
df1 = df.withColumn("event_change",when(col("event_status")!=lag("event_status",1).over(Window.orderBy("event_date")),1).otherwise(0))

df1.display()

event_date,event_status,event_change
2020-06-01,Won,0
2020-06-02,Won,0
2020-06-03,Won,0
2020-06-04,Lost,1
2020-06-05,Lost,0
2020-06-06,Lost,0
2020-06-07,Won,1


In [0]:
#or
windowSpec=Window.orderBy("event_date")

df1 = df.withColumn("prev_status",lag("event_status",1).over(windowSpec))

df1 = df1.withColumn("event_change",when(col("event_status")!=col("prev_status"),1).otherwise(0))

df1.display()


event_date,event_status,prev_status,event_change
2020-06-01,Won,,0
2020-06-02,Won,Won,0
2020-06-03,Won,Won,0
2020-06-04,Lost,Won,1
2020-06-05,Lost,Lost,0
2020-06-06,Lost,Lost,0
2020-06-07,Won,Lost,1


In [0]:
#create a column "event_group" based on the cumulativ sum of event_change column

#ie. running sum for event_change column: 
df2 = df1.withColumn("event_group",sum("event_change").over(Window.orderBy("event_date")))
df2.display()


event_date,event_status,prev_status,event_change,event_group
2020-06-01,Won,,0,0
2020-06-02,Won,Won,0,0
2020-06-03,Won,Won,0,0
2020-06-04,Lost,Won,1,1
2020-06-05,Lost,Lost,0,1
2020-06-06,Lost,Lost,0,1
2020-06-07,Won,Lost,1,2


In [0]:
#First & Last Window Function : Calculate Event Start and End Date

output_df = df2.groupBy("event_group","event_status") \
               .agg(first("event_date").alias("start_date"), last("event_date").alias("end_date")) \
               .drop("event_group") #dropping unwanted column

output_df.display()

event_status,start_date,end_date
Won,2020-06-01,2020-06-03
Lost,2020-06-04,2020-06-06
Won,2020-06-07,2020-06-07


In [0]:
#or

output_df = df2.groupBy("event_group","event_status") \
               .agg(min("event_date").alias("start_date"), max("event_date").alias("end_date")) \
               .drop("event_group") #dropping unwanted column

output_df.display()

event_status,start_date,end_date
Won,2020-06-01,2020-06-03
Lost,2020-06-04,2020-06-06
Won,2020-06-07,2020-06-07


##Using SPARK SQL

In [0]:
#convert dataframe to TempView

df.createOrReplaceTempView("events")

In [0]:
%sql

with cte1 as (
  select event_date,event_status,
  lag(event_status,1) over (order by event_date) as prev_event,
  case
   when event_status!=lag(event_status,1) over (order by event_date) then 1 else 0
  end as event_change
  from events
),
cte2 as (
  select event_date,event_status,prev_event,event_change,
  sum(event_change) over (order by event_date) as event_group
  from cte1
)

select event_status,min(event_date) as start_date, max(event_date) as end_date
from cte2
group by event_group,event_status


event_status,start_date,end_date
Won,2020-06-01,2020-06-03
Lost,2020-06-04,2020-06-06
Won,2020-06-07,2020-06-07


In [0]:
%sql

--or--
--using First and Last 

with cte1 as (
  select event_date,event_status,
  lag(event_status,1) over (order by event_date) as prev_event,
  case
   when event_status!=lag(event_status,1) over (order by event_date) then 1 else 0
  end as event_change
  from events
),
cte2 as (
  select event_date,event_status,prev_event,event_change,
  sum(event_change) over (order by event_date) as event_group
  from cte1
)

select event_status,first(event_date) as start_date, last(event_date) as end_date
from cte2
group by event_group,event_status

event_status,start_date,end_date
Won,2020-06-01,2020-06-03
Lost,2020-06-04,2020-06-06
Won,2020-06-07,2020-06-07
