<a href="https://colab.research.google.com/github/pcbzmani/SQL_Learning/blob/main/null_safe_and_leadlag.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=0f4eaca2b49222d14bf34bfe50580a52d6ac53a13b504adfdcff9afa15db1abb
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
# prompt: create a spark dataframe with some null and valid values

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("NullAndNotNullExample").getOrCreate()

data = [("James Smith", 30, "Block1"),
        ("Michael Rose,", 40, "Block2"),
        ("Robert Williams", 50, None),
        ("Maria Anne", 60, "Block1"),
        ("Jen Brown", 70, "Block2")
       ]

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("address", StringType(), True)
])

df = spark.createDataFrame(data=data, schema=schema)

# df.show(truncate=False)
df.createOrReplaceTempView('customer')

+---------------+---+-------+
|name           |age|address|
+---------------+---+-------+
|James Smith    |30 |Block1 |
|Michael Rose,  |40 |Block2 |
|Robert Williams|50 |NULL   |
|Maria Anne     |60 |Block1 |
|Jen Brown      |70 |Block2 |
+---------------+---+-------+



In [None]:
df = spark.sql(f"""
    select * from customer
    where address != 'Block1'
""")

In [None]:
df.show(truncate = False)

+-------------+---+-------+
|name         |age|address|
+-------------+---+-------+
|Michael Rose,|40 |Block2 |
|Jen Brown    |70 |Block2 |
+-------------+---+-------+



In [None]:
df1 = spark.sql(f"""
    select * from customer
    where address = 'Block1'
""")

In [None]:
df1.show(truncate = False)

+-----------+---+-------+
|name       |age|address|
+-----------+---+-------+
|James Smith|30 |Block1 |
|Maria Anne |60 |Block1 |
+-----------+---+-------+



In [None]:
df2 = spark.sql(f"""
    select * from customer
    where !(address <=> 'Block1')
""")

In [None]:
df2.show(truncate = False)

+---------------+---+-------+
|name           |age|address|
+---------------+---+-------+
|Michael Rose,  |40 |Block2 |
|Robert Williams|50 |NULL   |
|Jen Brown      |70 |Block2 |
+---------------+---+-------+



In [None]:
df3 = spark.sql(f"""
    select * from customer
    where coalesce(address, 'Empty') != 'Block1'
""")

In [None]:
df3.show()

+---------------+---+-------+
|           name|age|address|
+---------------+---+-------+
|  Michael Rose,| 40| Block2|
|Robert Williams| 50|   NULL|
|      Jen Brown| 70| Block2|
+---------------+---+-------+



In [None]:
spark.sql(" Select 1 = 1, NULL = NULL, 2 = NULL").show(truncate=False)

+-------+-------------+----------+
|(1 = 1)|(NULL = NULL)|(2 = NULL)|
+-------+-------------+----------+
|true   |NULL         |NULL      |
+-------+-------------+----------+



In [None]:
spark.sql(" Select 1 <=> 1, NULL <=> NULL, 2 <=> NULL").show(truncate=False)

+---------+---------------+------------+
|(1 <=> 1)|(NULL <=> NULL)|(2 <=> NULL)|
+---------+---------------+------------+
|true     |true           |false       |
+---------+---------------+------------+



In [9]:
###

# prompt: create a spark dataframe with some null and valid values

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

spark = SparkSession.builder.appName("leadlagtable").getOrCreate()

data = [(1,'2024-01-01','2024-01-29')
        ,(1,'2024-02-15','2024-03-29')
        ,(1,'2024-05-01','2024-06-29')
        ,(1,'2024-10-01','2024-12-29')
        ,(2,'2024-01-01','2024-01-15')
        ,(2,'2024-02-15','2024-03-29')
        ,(2,'2024-06-01','2024-06-29')
        ,(2,'2024-09-01','2024-12-29')
       ]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("start_dt", StringType(), True),
    StructField("end_dt", StringType(), True)
])


In [10]:
df = spark.createDataFrame(data=data, schema=schema)

# df.show(truncate=False)
df.createOrReplaceTempView('dateorder')

In [11]:
df = spark.sql("""with new_dt as(
select
    id
	 ,start_dt
    ,end_dt
    ,lag(date_add(to_date(end_dt,'yyyy-MM-dd'),  1)) over(partition by id order by end_dt) as new_start_dt
    ,lead(date_sub(to_date(start_dt,'yyyy-MM-dd'),  1 )) over(partition by id order by start_dt) as new_end_dt
 from dateorder
 ),
 main_dt as (
 select
   id
   ,start_dt
   ,end_dt
   ,new_start_dt
   ,lag(new_end_dt) over(partition by id order by start_dt) as new_end_date
 from new_dt
 )
 select * from (
 select
	id
    ,new_start_dt as start_dt
    ,new_end_date as end_dt
    from main_dt
    where new_start_dt is not null
    union
    select
	id
    ,start_dt
    ,end_dt
    from dateorder
   ) a
   order by a.id
   ,a.start_dt
""")
df.show()

+---+----------+----------+
| id|  start_dt|    end_dt|
+---+----------+----------+
|  1|2024-01-01|2024-01-29|
|  1|2024-01-30|2024-02-14|
|  1|2024-02-15|2024-03-29|
|  1|2024-03-30|2024-04-30|
|  1|2024-05-01|2024-06-29|
|  1|2024-06-30|2024-09-30|
|  1|2024-10-01|2024-12-29|
|  2|2024-01-01|2024-01-15|
|  2|2024-01-16|2024-02-14|
|  2|2024-02-15|2024-03-29|
|  2|2024-03-30|2024-05-31|
|  2|2024-06-01|2024-06-29|
|  2|2024-06-30|2024-08-31|
|  2|2024-09-01|2024-12-29|
+---+----------+----------+

