In [2]:
import sys
from pyspark.sql import SparkSession, functions as F
import pandas


In [102]:
spark = (
    SparkSession
    .builder
    # .master('dev-java.home.lan')
    .master('local[4]')
    .appName('mySpark')
    .getOrCreate()
)


In [134]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType, BooleanType
from pyspark.sql import Window

sm = StructType(fields = [
    StructField("lc", StringType()),
    StructField("ln", StringType()),
    StructField("lns", StringType()),
    StructField("activeFlag", StringType()),
    StructField("sd", DateType()),
    StructField("ed", DateType()),
    #StructField("cd", StringType()),
    #StructField("ca", StringType()),
    #StructField("snapshot_", StringType()),
    #StructField("cv", TimestampType()),
    #StructField("cl", IntegerType())
])
df = spark.read.csv("sbrf.csv", schema=sm, sep=',')
df.orderBy(["lc","ln", "sd"]).show(25, truncate=False)


+----+------+-------+----------+----------+----------+
|lc  |ln    |lns    |activeFlag|sd        |ed        |
+----+------+-------+----------+----------+----------+
|lc-1|ln-1-1|lns-1-1|1         |2020-12-25|2021-01-01|
|lc-1|ln-1-2|lns-1-2|1         |2021-01-02|2021-01-31|
|lc-1|ln-1-3|lns-1-3|1         |2021-02-01|2079-06-01|
|lc-2|ln-2-1|lns-2-1|0         |2020-12-02|2024-12-01|
|lc-2|ln-2-2|lns-2-2|0         |2024-12-02|2024-12-04|
|lc-2|ln-2-3|lns-2-3|0         |2024-12-05|2099-01-01|
|lc-3|ln-3-1|lns-3-1|0         |2024-12-02|2019-01-02|
|lc-4|ln-4-1|lns-4-1|1         |2020-01-01|2021-01-02|
|lc-5|ln-5-1|lns-5-1|1         |2020-01-01|2021-01-02|
|lc-5|ln-5-2|lns-5-2|1         |2020-01-03|2079-06-06|
+----+------+-------+----------+----------+----------+



In [136]:
w = Window.partitionBy("lc").orderBy("sd")
df = (df.withColumn("af", F.col("activeFlag").cast(BooleanType()))
    .withColumn("nv", F.lead("sd").over(w)).where('1=1 and sd <= ed'))
#df_ = df.withColumn("dtn", F.to_timestamp("dt1", "dd-MM-yyyy HH:mm:ss"))
# df_.printSchema()
df.orderBy(["lc","ln", "sd"]).show(25, truncate=False)


+----+------+-------+----------+----------+----------+-----+----------+
|lc  |ln    |lns    |activeFlag|sd        |ed        |af   |nv        |
+----+------+-------+----------+----------+----------+-----+----------+
|lc-1|ln-1-1|lns-1-1|1         |2020-12-25|2021-01-01|true |2021-01-02|
|lc-1|ln-1-2|lns-1-2|1         |2021-01-02|2021-01-31|true |2021-02-01|
|lc-1|ln-1-3|lns-1-3|1         |2021-02-01|2079-06-01|true |NULL      |
|lc-2|ln-2-1|lns-2-1|0         |2020-12-02|2024-12-01|false|2024-12-02|
|lc-2|ln-2-2|lns-2-2|0         |2024-12-02|2024-12-04|false|2024-12-05|
|lc-2|ln-2-3|lns-2-3|0         |2024-12-05|2099-01-01|false|NULL      |
|lc-4|ln-4-1|lns-4-1|1         |2020-01-01|2021-01-02|true |NULL      |
|lc-5|ln-5-1|lns-5-1|1         |2020-01-01|2021-01-02|true |2020-01-03|
|lc-5|ln-5-2|lns-5-2|1         |2020-01-03|2079-06-06|true |NULL      |
+----+------+-------+----------+----------+----------+-----+----------+



In [137]:
#df1 = (df.select("lc","ln","lns","af","case af when af then false else true end as af1","case when af = true then ed when af = false then cast('1900-01-01' as date) end as sd"))
df1 = (df.select("lc","ln","lns","af"
                 , F.when(df['af'] == True, False).otherwise(True).alias("af1")
                 , "sd", "ed"
                 , F.when(df['af'] == True, df.ed).otherwise('1900-01-01').alias("dt_")
                 #, F.when(df['af'] == True, df.ed).otherwise('1900-01-01').alias("dt_")
        )
    .where("nv is null and ed <> cast('2079-06-06' as date)")
)
df1.orderBy(["lc","ln", "dt_"]).show()
df2 = (df.select("lc","ln","lns","af"
                 #, "af as af1"
                 , df.af.alias("af1")
                 , "sd", "ed"
                 , F.when(( (df.af == False) & (df.nv.isNull()) & (df.ed != '2079-06-06') ), df.ed ).otherwise(df.sd).alias("dt_")
                 #, F.when(df['af'] == True, df.ed).otherwise('1900-01-01').alias("dt_")
            )
        )
df2.orderBy(["lc","ln", "dt_"]).show()


df1 = (df.select("lc","ln","lns","af"
                 , F.when(df['af'] == True, False).otherwise(True).alias("af1")
                 , "sd", "ed"
                 , F.when(df['af'] == True, df.ed).otherwise('1900-01-01').alias("dt_")
                 #, F.when(df['af'] == True, df.ed).otherwise('1900-01-01').alias("dt_")
        )
    .where("nv is null and ed <> cast('2079-06-06' as date)")
    .union(
        (df.select("lc","ln","lns","af"
                 #, "af as af1"
                 , df.af.alias("af1")
                 , "sd", "ed"
                 , F.when(( (df.af == False) & (df.nv.isNull()) & (df.ed != '2079-06-06') ), df.ed ).otherwise(df.sd).alias("dt_")
                 #, F.when(df['af'] == True, df.ed).otherwise('1900-01-01').alias("dt_")
            )
        )
    )
)

#df.withColumn("nv", F.lead("sd").over(w)).where('1=1 and sd <= ed').show()
df1.orderBy(["lc","ln", "dt_"]).show()

+----+------+-------+-----+-----+----------+----------+----------+
|  lc|    ln|    lns|   af|  af1|        sd|        ed|       dt_|
+----+------+-------+-----+-----+----------+----------+----------+
|lc-1|ln-1-3|lns-1-3| true|false|2021-02-01|2079-06-01|2079-06-01|
|lc-2|ln-2-3|lns-2-3|false| true|2024-12-05|2099-01-01|1900-01-01|
|lc-4|ln-4-1|lns-4-1| true|false|2020-01-01|2021-01-02|2021-01-02|
+----+------+-------+-----+-----+----------+----------+----------+

+----+------+-------+-----+-----+----------+----------+----------+
|  lc|    ln|    lns|   af|  af1|        sd|        ed|       dt_|
+----+------+-------+-----+-----+----------+----------+----------+
|lc-1|ln-1-1|lns-1-1| true| true|2020-12-25|2021-01-01|2020-12-25|
|lc-1|ln-1-2|lns-1-2| true| true|2021-01-02|2021-01-31|2021-01-02|
|lc-1|ln-1-3|lns-1-3| true| true|2021-02-01|2079-06-01|2021-02-01|
|lc-2|ln-2-1|lns-2-1|false|false|2020-12-02|2024-12-01|2020-12-02|
|lc-2|ln-2-2|lns-2-2|false|false|2024-12-02|2024-12-04|2024-1